This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7258 in repository https://gitbox.apache.org/repos/asf/geode.git
commit bea2792f1c2fb1f53a812f80d1e21f850e6f34da Author: zhouxh <[email protected]> AuthorDate: Fri Oct 18 10:27:46 2019 -0700 GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed. Co-authored-by: Anil <[email protected]> Co-authored-by: Xiaojian Zhou <[email protected]> --- .../cache/client/internal/ExecuteFunctionOp.java | 10 +- .../client/internal/ExecuteRegionFunctionOp.java | 15 +- .../internal/ExecuteRegionFunctionSingleHopOp.java | 3 +- .../geode/cache/client/internal/PoolImpl.java | 24 + .../cache/client/internal/ServerRegionProxy.java | 8 +- .../client/internal/SingleHopClientExecutor.java | 17 +- .../internal/pooling/ConnectionManagerImpl.java | 4 +- .../internal/ExecuteFunctionOpRetryTest.java | 5 +- .../internal/ExecuteFunctionTestSupport.java | 8 +- .../internal/ExecuteRegionFunctionOpRetryTest.java | 5 +- .../ExecuteRegionFunctionSingleHopOpRetryTest.java | 9 +- .../geode/cache/client/internal/PoolImplTest.java | 146 ++++ .../LuceneSearchWithRollingUpgradeDUnit.java | 813 +-------------------- ...=> LuceneSearchWithRollingUpgradeTestBase.java} | 262 ++----- ...ultAfterTwoLocatorsWithTwoServersAreRolled.java | 4 +- ...tAndServersAreRestartedFromCurrentVersion.java} | 55 +- ...tResultsAfterClientAndServersAreRolledOver.java | 9 +- ...ntAndServersAreRolledOverAllBucketsCreated.java | 7 +- 18 files changed, 326 insertions(+), 1078 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java index 09f926c..91baa4d 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java @@ -24,7 +24,6 @@ import java.util.function.Supplier; import org.apache.logging.log4j.Logger; import org.apache.geode.InternalGemFireError; -import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.execute.Function; @@ -83,8 +82,8 @@ public class ExecuteFunctionOp { } else { boolean reexecute = false; + int maxRetryAttempts = -1; - int maxRetryAttempts = pool.getRetryAttempts(); if (!isHA) { maxRetryAttempts = 0; } @@ -107,11 +106,8 @@ public class ExecuteFunctionOp { } catch (ServerConnectivityException se) { - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1; + if (maxRetryAttempts == -1) { + maxRetryAttempts = pool.calculateRetryAttempts(se); } if ((maxRetryAttempts--) < 1) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java index eba38a6..776ea2e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.client.NoAvailableServersException; -import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl; @@ -67,17 +66,14 @@ public class ExecuteRegionFunctionOp { /** * Does a execute Function on a server using connections from the given pool to communicate with * the server. - * - * @param pool the pool to use to communicate with the server. - * @param resultCollector is used to collect the results from the Server - * @param maxRetryAttempts Maximum number of retry attempts */ static void execute(ExecutablePool pool, ResultCollector resultCollector, - int maxRetryAttempts, boolean isHA, + int retryAttempts, boolean isHA, ExecuteRegionFunctionOpImpl op, boolean isReexecute, Set<String> failedNodes) { + int maxRetryAttempts = retryAttempts > 0 ? retryAttempts : -1; if (!isHA) { maxRetryAttempts = 0; } @@ -107,11 +103,8 @@ public class ExecuteRegionFunctionOp { throw failedException; } catch (ServerConnectivityException se) { - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1; + if (maxRetryAttempts == -1) { + maxRetryAttempts = ((PoolImpl) pool).calculateRetryAttempts(se); } if ((maxRetryAttempts--) < 1) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java index eb162829..8fa8510 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java @@ -66,7 +66,6 @@ public class ExecuteRegionFunctionSingleHopOp { ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector, Map<ServerLocation, ? extends HashSet> serverToFilterMap, - int mRetryAttempts, boolean isHA, final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction, final Supplier<AbstractOp> executeRegionFunctionOpSupplier) { @@ -87,7 +86,7 @@ public class ExecuteRegionFunctionSingleHopOp { final int retryAttempts = SingleHopClientExecutor.submitAllHA(callableTasks, (LocalRegion) region, isHA, - resultCollector, failedNodes, mRetryAttempts, ((PoolImpl) pool)); + resultCollector, failedNodes, ((PoolImpl) pool)); if (isDebugEnabled) { logger.debug( diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java index 3dbd99f..bb12253 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java @@ -41,6 +41,7 @@ import org.apache.geode.cache.NoSubscriptionServersAvailableException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionService; import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.SubscriptionNotEnabledException; import org.apache.geode.cache.client.internal.pooling.ConnectionManager; @@ -1581,4 +1582,27 @@ public class PoolImpl implements InternalPool { public int getSubscriptionTimeoutMultiplier() { return subscriptionTimeoutMultiplier; } + + public int calculateRetryAttempts(Throwable cause) { + + int maxRetryAttempts = getRetryAttempts(); + + if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { + // If the retryAttempt is set to default(-1). Try executing on all servers once. + // As calculating number of servers involves sending message to locator, it is + // done only when there is an exception. + if (cause instanceof ServerConnectivityException + && cause.getMessage().contains("Could not create a new connection")) { + // The client was unable to establish a connection before sending the + // request. + maxRetryAttempts = getConnectionSource().getAllServers().size(); + } else { + // The request was sent once. + maxRetryAttempts = getConnectionSource().getAllServers().size() - 1; + } + } + + return maxRetryAttempts; + } + } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java index 9481cb4..cd15529 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java @@ -699,7 +699,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc hasResult, emptySet(), true, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor, - resultCollector, serverToBuckets, retryAttempts, function.isHA(), + resultCollector, serverToBuckets, function.isHA(), regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } } else { @@ -725,7 +725,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc hasResult, emptySet(), isBucketFilter, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, - serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts, + serverRegionExecutor, resultCollector, serverToFilterMap, function.isHA(), regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } @@ -786,7 +786,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc emptySet(), true, isHA, optimizeForWrite, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, - serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA, + serverRegionExecutor, resultCollector, serverToBuckets, isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } @@ -810,7 +810,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, - serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts, + serverRegionExecutor, resultCollector, serverToFilterMap, isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java index 40a128c..c4c3eb6 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java @@ -30,7 +30,6 @@ import org.apache.geode.GemFireException; import org.apache.geode.InternalGemFireException; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl; @@ -89,11 +88,10 @@ public class SingleHopClientExecutor { static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA, ResultCollector rc, Set<String> failedNodes, - final int retryAttemptsArg, final PoolImpl pool) { - ClientMetadataService cms = region.getCache().getClientMetadataService(); - int maxRetryAttempts = 0; + ClientMetadataService cms; + int maxRetryAttempts = -1; if (callableTasks != null && !callableTasks.isEmpty()) { List futures = null; @@ -120,15 +118,8 @@ public class SingleHopClientExecutor { throw new InternalGemFireException(e.getMessage()); } catch (ExecutionException ee) { - if (maxRetryAttempts == 0) { - maxRetryAttempts = retryAttemptsArg; - } - - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1; + if (maxRetryAttempts == -1) { + maxRetryAttempts = pool.calculateRetryAttempts(ee.getCause()); } if (ee.getCause() instanceof InternalFunctionInvocationTargetException) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java index a2a343f..e37899d 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java @@ -70,6 +70,7 @@ import org.apache.geode.security.GemFireSecurityException; public class ConnectionManagerImpl implements ConnectionManager { private static final Logger logger = LogService.getLogger(); private static final int NOT_WAITING = -1; + public static final String BORROW_CONN_ERROR_MSG = "Could not create a new connection to server "; private final String poolName; private final PoolStats poolStats; @@ -321,8 +322,7 @@ public class ConnectionManagerImpl implements ConnectionManager { return connection; } - throw new ServerConnectivityException( - "Could not create a new connection to server " + server); + throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java index ae9ae13..bb11a04 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java @@ -228,9 +228,8 @@ public class ExecuteFunctionOpRetryTest { testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg, (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())), - failureMode)); - - when(testSupport.getExecutablePool().getRetryAttempts()).thenReturn(retryAttempts); + failureMode), + retryAttempts); args = null; memberMappedArg = mock(MemberMappedArgument.class); diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java index c91816f..1d0cad5 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.client.internal; import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -91,8 +92,6 @@ class ExecuteFunctionTestSupport { * This method has to be {@code static} because it is called before * {@link ExecuteFunctionTestSupport} is constructed. * - * @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()} - * methods on {@link PoolImpl} * @param failureMode is the {@link FailureMode} that determines the kind of exception * to {@code throw} */ @@ -149,7 +148,7 @@ class ExecuteFunctionTestSupport { ExecuteFunctionTestSupport( final HAStatus haStatus, final FailureMode failureMode, - final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) { + final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior, Integer retryAttempts) { final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class); when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS); @@ -174,6 +173,9 @@ class ExecuteFunctionTestSupport { executablePool = mock(PoolImpl.class); when(executablePool.getConnectionSource()).thenReturn(connectionSource); + when(executablePool.getRetryAttempts()).thenReturn(retryAttempts); + when(executablePool.calculateRetryAttempts(any(ServerConnectivityException.class))) + .thenCallRealMethod(); addPoolMockBehavior.accept(executablePool, failureMode); } diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java index 74f6748..e92bad1 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java @@ -309,7 +309,7 @@ public class ExecuteRegionFunctionOpRetryTest { default: throw new AssertionError("unknown FailureMode type: " + failureMode); } - }); + }, retryAttempts); executeFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts, testSupport.getExecutablePool(), @@ -325,7 +325,8 @@ public class ExecuteRegionFunctionOpRetryTest { testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg, (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())), - failureMode)); + failureMode), + retryAttempts); reExecuteFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts, testSupport.getExecutablePool(), diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java index aef00fb..5649b1b 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java @@ -137,7 +137,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { } private void createMocks(final HAStatus haStatus, - final FailureMode failureModeArg) { + final FailureMode failureModeArg, Integer retryAttempts) { testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg, (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when( @@ -146,7 +146,8 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean())), - failureMode)); + failureMode), + retryAttempts); serverToFilterMap = new HashMap<>(); serverToFilterMap.put(new ServerLocation("host1", 10), new HashSet<>()); @@ -158,7 +159,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { final int retryAttempts, final int expectTries, final FailureMode failureMode) { - createMocks(haStatus, failureMode); + createMocks(haStatus, failureMode, retryAttempts); executeFunctionSingleHopAndValidate(haStatus, functionIdentifierType, retryAttempts, testSupport.getExecutablePool(), @@ -182,7 +183,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { () -> ignoreServerConnectivityException(() -> ExecuteRegionFunctionSingleHopOp.execute( executablePool, testSupport.getRegion(), executor, resultCollector, serverToFilterMap, - retryAttempts, testSupport.toBoolean(haStatus), executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl( testSupport.getRegion().getFullPath(), FUNCTION_NAME, @@ -199,7 +199,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { ignoreServerConnectivityException( () -> ExecuteRegionFunctionSingleHopOp.execute(executablePool, testSupport.getRegion(), executor, resultCollector, serverToFilterMap, - retryAttempts, function.isHA(), executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl( testSupport.getRegion().getFullPath(), function, diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java new file mode 100644 index 0000000..20f5a90 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.client.internal; + +import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.Statistics; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.ServerConnectivityException; +import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PoolFactoryImpl; +import org.apache.geode.internal.cache.PoolManagerImpl; +import org.apache.geode.internal.monitoring.ThreadsMonitoring; +import org.apache.geode.internal.net.SSLConfigurationFactory; +import org.apache.geode.internal.security.SecurableCommunicationChannel; +import org.apache.geode.test.junit.categories.ClientServerTest; + +@Category({ClientServerTest.class}) +public class PoolImplTest { + + @Test + public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureDuringBorrowConnection() { + List servers = mock(List.class); + when(servers.size()).thenReturn(1); + ConnectionSource connectionSource = mock(ConnectionSource.class); + when(connectionSource.getAllServers()).thenReturn(servers); + ServerConnectivityException serverConnectivityException = + mock(ServerConnectivityException.class); + when(serverConnectivityException.getMessage()) + .thenReturn(ConnectionManagerImpl.BORROW_CONN_ERROR_MSG); + + PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS)); + when(poolImpl.getConnectionSource()).thenReturn(connectionSource); + + assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1); + } + + @Test + public void calculateRetryAttemptsDecrementsRetryCountForFailureAfterSendingTheRequest() { + List servers = mock(List.class); + when(servers.size()).thenReturn(1); + ConnectionSource connectionSource = mock(ConnectionSource.class); + when(connectionSource.getAllServers()).thenReturn(servers); + ServerConnectivityException serverConnectivityException = + mock(ServerConnectivityException.class); + when(serverConnectivityException.getMessage()).thenReturn("Timeout Exception"); + + PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS)); + when(poolImpl.getConnectionSource()).thenReturn(connectionSource); + + assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(0); + } + + @Test + public void calculateRetryAttemptsReturnsTheRetyCountConfiguredWithPool() { + int retryCount = 1; + List servers = mock(List.class); + when(servers.size()).thenReturn(1); + ConnectionSource connectionSource = mock(ConnectionSource.class); + when(connectionSource.getAllServers()).thenReturn(servers); + ServerConnectivityException serverConnectivityException = + mock(ServerConnectivityException.class); + when(serverConnectivityException.getMessage()).thenReturn("Timeout Exception"); + + PoolImpl poolImpl = spy(getPool(retryCount)); + when(poolImpl.getConnectionSource()).thenReturn(connectionSource); + + assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(retryCount); + } + + private PoolImpl getPool(int retryAttemptsAttribute) { + final DistributionConfig distributionConfig = mock(DistributionConfig.class); + doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig) + .getSecurableCommunicationChannels(); + + SSLConfigurationFactory.setDistributionConfig(distributionConfig); + + final Properties properties = new Properties(); + properties.put(DURABLE_CLIENT_ID, "1"); + + final Statistics statistics = mock(Statistics.class); + + final PoolFactoryImpl.PoolAttributes poolAttributes = + mock(PoolFactoryImpl.PoolAttributes.class); + + /* + * These are the minimum pool attributes required + * so that basic validation and setup completes successfully. The values of + * these attributes have no importance to the assertions of the test itself. + */ + doReturn(1).when(poolAttributes).getMaxConnections(); + doReturn((long) 10e8).when(poolAttributes).getPingInterval(); + doReturn(retryAttemptsAttribute).when(poolAttributes).getRetryAttempts(); + + final CancelCriterion cancelCriterion = mock(CancelCriterion.class); + + final InternalCache internalCache = mock(InternalCache.class); + doReturn(cancelCriterion).when(internalCache).getCancelCriterion(); + + final InternalDistributedSystem internalDistributedSystem = + mock(InternalDistributedSystem.class); + doReturn(distributionConfig).when(internalDistributedSystem).getConfig(); + doReturn(properties).when(internalDistributedSystem).getProperties(); + doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString()); + + final PoolManagerImpl poolManager = mock(PoolManagerImpl.class); + doReturn(true).when(poolManager).isNormal(); + + final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class); + + return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(), + internalDistributedSystem, internalCache, tMonitoring); + } + +} diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java index 2a8cf9f..98d39d6 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java @@ -14,67 +14,39 @@ */ package org.apache.geode.cache.lucene; -import static org.apache.geode.test.awaitility.GeodeAwaitility.await; -import static org.apache.geode.test.dunit.Assert.fail; -import static org.junit.Assert.assertEquals; import java.io.File; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.apache.commons.io.FileUtils; -import org.apache.logging.log4j.Logger; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.apache.geode.cache.GemFireCache; import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.client.ClientRegionFactory; -import org.apache.geode.cache.client.ClientRegionShortcut; -import org.apache.geode.cache.lucene.internal.LuceneServiceImpl; -import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache30.CacheSerializableRunnable; -import org.apache.geode.distributed.Locator; -import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.serialization.Version; -import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.IgnoredException; -import org.apache.geode.test.dunit.Invoke; import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.internal.DUnitLauncher; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; import org.apache.geode.test.version.TestVersion; import org.apache.geode.test.version.VersionManager; @RunWith(Parameterized.class) @Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) -public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase { +public abstract class LuceneSearchWithRollingUpgradeDUnit + extends LuceneSearchWithRollingUpgradeTestBase { - @Parameterized.Parameters(name = "from_v{0}, with reindex={1}") + @Parameterized.Parameters(name = "from_v{0}, with reindex={1}, singleHopEnabled={2}") public static Collection<Object[]> data() { Collection<String> luceneVersions = getLuceneVersions(); Collection<Object[]> rval = new ArrayList<>(); luceneVersions.forEach(v -> { - rval.add(new Object[] {v, true}); - rval.add(new Object[] {v, false}); + rval.add(new Object[] {v, true, true}); + rval.add(new Object[] {v, false, true}); }); return rval; } @@ -84,6 +56,10 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu // Lucene Compatibility checks start with Apache Geode v1.2.0 // Removing the versions older than v1.2.0 result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0); + + // The changes relating to GEODE-7258 is not applied on 1.10.0, skipping rolling + // upgrade for 1.10.0. The change was verified by rolling from develop to develop. + result.removeIf(s -> TestVersion.compare(s, "1.10.0") == 0); if (result.size() < 1) { throw new RuntimeException("No older versions of Geode were found to test against"); } else { @@ -92,15 +68,6 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return result; } - private File[] testingDirs = new File[3]; - - protected static String INDEX_NAME = "index"; - - private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit"; - - // Each vm will have a cache object - protected static Object cache; - // the old version of Geode we're testing against @Parameterized.Parameter() public String oldVersion; @@ -108,137 +75,8 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Parameterized.Parameter(1) public Boolean reindex; - private void deleteVMFiles() { - System.out.println("deleting files in vm" + VM.getCurrentVMNum()); - File pwd = new File("."); - for (File entry : pwd.listFiles()) { - try { - if (entry.isDirectory()) { - FileUtils.deleteDirectory(entry); - } else { - entry.delete(); - } - } catch (Exception e) { - System.out.println("Could not delete " + entry + ": " + e.getMessage()); - } - } - } - - private void deleteWorkingDirFiles() { - Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles()); - } - - @Override - public void postSetUp() { - deleteWorkingDirFiles(); - IgnoredException.addIgnoredException( - "cluster configuration service not available|ConflictingPersistentDataException"); - } - - - Properties getLocatorPropertiesPre91(String locatorsString) { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); - props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true"); - return props; - } - - VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut, - String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) { - VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled); - // recreate region on "rolled" client - invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient); - return rollClient; - } - - private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts, - boolean subscriptionEnabled) { - oldClient.invoke(invokeCloseCache()); - VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId()); - rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, - subscriptionEnabled)); - rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL)); - return rollClient; - } - - CacheSerializableRunnable invokeCreateClientRegion(final String regionName, - final ClientRegionShortcut shortcut) { - return new CacheSerializableRunnable("execute: createClientRegion") { - @Override - public void run2() { - try { - createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, regionName, - shortcut); - } catch (Exception e) { - fail("Error creating client region", e); - } - } - }; - } - - private static void createClientRegion(GemFireCache cache, String regionName, - ClientRegionShortcut shortcut) { - ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut); - rf.create(regionName); - } - - CacheSerializableRunnable invokeStartCacheServer(final int port) { - return new CacheSerializableRunnable("execute: startCacheServer") { - @Override - public void run2() { - try { - startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, port); - } catch (Exception e) { - fail("Error creating cache", e); - } - } - }; - } - - private static void startCacheServer(GemFireCache cache, int port) throws Exception { - CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer(); - cacheServer.setPort(port); - cacheServer.start(); - } - - CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties, - final String[] hosts, final int[] ports, boolean subscriptionEnabled) { - return new CacheSerializableRunnable("execute: createClientCache") { - @Override - public void run2() { - try { - LuceneSearchWithRollingUpgradeDUnit.cache = - createClientCache(systemProperties, hosts, ports, subscriptionEnabled); - } catch (Exception e) { - fail("Error creating client cache", e); - } - } - }; - } - - Properties getClientSystemProperties() { - Properties p = new Properties(); - p.setProperty("mcast-port", "0"); - return p; - } - - - private static ClientCache createClientCache(Properties systemProperties, String[] hosts, - int[] ports, boolean subscriptionEnabled) { - ClientCacheFactory cf = new ClientCacheFactory(systemProperties); - if (subscriptionEnabled) { - cf.setPoolSubscriptionEnabled(true); - cf.setPoolSubscriptionRedundancy(-1); - } - int hostsLength = hosts.length; - for (int i = 0; i < hostsLength; i++) { - cf.addPoolLocator(hosts[i], ports[i]); - } - - return cf.create(); - } + @Parameterized.Parameter(2) + public Boolean singleHopEnabled; // We start an "old" locator and old servers // We roll the locator @@ -303,7 +141,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu locatorString); server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, - testingDirs[0], shortcutName, regionName, locatorPorts); + testingDirs[0], shortcutName, regionName, locatorPorts, reindex); verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server1); expectedRegionSize += 5; putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 5, @@ -313,7 +151,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu 20, server1, server3); server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, - testingDirs[1], shortcutName, regionName, locatorPorts); + testingDirs[1], shortcutName, regionName, locatorPorts, reindex); verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server2); expectedRegionSize += 5; putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15, @@ -323,7 +161,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu 30, server2, server3); server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType, - testingDirs[2], shortcutName, regionName, locatorPorts); + testingDirs[2], shortcutName, regionName, locatorPorts, reindex); verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server3); putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 15, 25, server1, server2); @@ -340,627 +178,4 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } } - void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName, - int expectedRegionSize, int start, int end, VM... vms) throws Exception { - // do puts - putSerializableObject(putter, regionName, start, end); - - // verify present in others - verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms); - } - - void putSerializableObject(VM putter, String regionName, int start, int end) - throws Exception { - for (int i = start; i < end; i++) { - Class aClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.query.data.Portfolio"); - Constructor portfolioConstructor = aClass.getConstructor(int.class); - Object serializableObject = portfolioConstructor.newInstance(i); - putter.invoke(invokePut(regionName, i, serializableObject)); - } - } - - private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) { - await().untilAsserted(() -> { - Object region = - cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName); - int regionSize = (int) region.getClass().getMethod("size").invoke(region); - assertEquals("Region size not as expected after 60 seconds", expectedRegionSize, - regionSize); - }); - } - - void verifyLuceneQueryResults(String regionName, int expectedRegionSize) - throws Exception { - Class luceneServiceProvider = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider"); - Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class); - Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache); - luceneService.getClass() - .getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class) - .invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS); - Method createLuceneQueryFactoryMethod = - luceneService.getClass().getMethod("createLuceneQueryFactory"); - createLuceneQueryFactoryMethod.setAccessible(true); - Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService); - Object luceneQuery = luceneQueryFactory.getClass() - .getMethod("create", String.class, String.class, String.class, String.class) - .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status"); - - Collection resultsActive = executeLuceneQuery(luceneQuery); - - luceneQuery = luceneQueryFactory.getClass() - .getMethod("create", String.class, String.class, String.class, String.class) - .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status"); - - Collection resultsInactive = executeLuceneQuery(luceneQuery); - - assertEquals("Result size not as expected ", expectedRegionSize, - resultsActive.size() + resultsInactive.size()); - } - - private Collection executeLuceneQuery(Object luceneQuery) - throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { - Collection results = null; - int retryCount = 10; - while (true) { - try { - results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery); - break; - } catch (Exception ex) { - if (!ex.getCause().getMessage().contains("currently indexing")) { - throw ex; - } - if (--retryCount == 0) { - throw ex; - } - } - } - return results; - - } - - private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize, - VM... vms) { - for (VM vm : vms) { - vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize)); - vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize)); - } - - } - - void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) { - for (VM vm : vms) { - vm.invoke(runnable); - } - } - - // Used to close cache and make sure we attempt on all vms even if some do not have a cache - void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable, - VM... vms) { - for (VM vm : vms) { - try { - vm.invoke(runnable); - } catch (Exception e) { - if (!catchErrors) { - throw e; - } - } - } - } - - private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) { - // Roll the server - oldServer.invoke(invokeCloseCache()); - VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId()); - rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71() - : getSystemPropertiesPost71(locatorPorts))); - rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL)); - return rollServer; - } - - VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, String regionType, - File diskdir, String shortcutName, String regionName, int[] locatorPorts) { - VM rollServer = rollServerToCurrent(oldServer, locatorPorts); - return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName, - rollServer); - } - - private VM createLuceneIndexAndRegionOnRolledServer(String regionType, File diskdir, - String shortcutName, String regionName, VM rollServer) { - - Boolean serializeIt = reindex; - rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt); - rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); - // recreate region on "rolled" server - if ((regionType.equals("persistentPartitioned"))) { - CacheSerializableRunnable runnable = - invokeCreatePersistentPartitionedRegion(regionName, diskdir); - invokeRunnableInVMs(runnable, rollServer); - } else { - invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer); - } - rollServer.invoke(invokeRebalance()); - return rollServer; - } - - VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir, - String shortcutName, String regionName, int[] locatorPorts) { - VM rollServer = rollServerToCurrent(oldServer, locatorPorts); - // recreate region on "rolled" server - if ((regionType.equals("persistentPartitioned"))) { - CacheSerializableRunnable runnable = - invokeCreatePersistentPartitionedRegion(regionName, diskdir); - invokeRunnableInVMs(runnable, rollServer); - } else { - invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer); - } - rollServer.invoke(invokeRebalance()); - return rollServer; - } - - VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port, - final String testName, final String locatorString) { - // Roll the locator - oldLocator.invoke(invokeStopLocator()); - VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId()); - final Properties props = new Properties(); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props)); - return rollLocator; - } - - // Due to licensing changes - private Properties getSystemPropertiesPost71() { - Properties props = getSystemProperties(); - return props; - } - - // Due to licensing changes - private Properties getSystemPropertiesPost71(int[] locatorPorts) { - Properties props = getSystemProperties(locatorPorts); - return props; - } - - private Properties getSystemProperties() { - Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties()); - props.remove("disable-auto-reconnect"); - props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME); - props.remove(DistributionConfig.LOCK_MEMORY_NAME); - return props; - } - - Properties getSystemProperties(int[] locatorPorts) { - Properties p = new Properties(); - String locatorString = getLocatorString(locatorPorts); - p.setProperty("locators", locatorString); - p.setProperty("mcast-port", "0"); - return p; - } - - static String getLocatorString(int locatorPort) { - String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]"; - return locatorString; - } - - static String getLocatorString(int[] locatorPorts) { - StringBuilder locatorString = new StringBuilder(); - int numLocators = locatorPorts.length; - for (int i = 0; i < numLocators; i++) { - locatorString.append(getLocatorString(locatorPorts[i])); - if (i + 1 < numLocators) { - locatorString.append(","); - } - } - return locatorString.toString(); - } - - private CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port, - final String testName, final String locatorsString, final Properties props) { - return new CacheSerializableRunnable("execute: startLocator") { - @Override - public void run2() { - try { - startLocator(serverHostName, port, testName, locatorsString, props); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port, - final Properties props) { - return new CacheSerializableRunnable("execute: startLocator") { - @Override - public void run2() { - try { - startLocator(serverHostName, port, props); - } catch (Exception e) { - fail("Error starting locators", e); - } - } - }; - } - - CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) { - return new CacheSerializableRunnable("execute: createCache") { - @Override - public void run2() { - try { - LuceneSearchWithRollingUpgradeDUnit.cache = createCache(systemProperties); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - private CacheSerializableRunnable invokeAssertVersion(final short version) { - return new CacheSerializableRunnable("execute: assertVersion") { - @Override - public void run2() { - try { - assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache, version); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - CacheSerializableRunnable invokeCreateRegion(final String regionName, - final String shortcutName) { - return new CacheSerializableRunnable("execute: createRegion") { - @Override - public void run2() { - try { - createRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, shortcutName); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - private CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(final String regionName, - final File diskstore) { - return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") { - @Override - public void run2() { - try { - createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, - diskstore); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - private CacheSerializableRunnable invokePut(final String regionName, final Object key, - final Object value) { - return new CacheSerializableRunnable("execute: put") { - @Override - public void run2() { - try { - put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key, value); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - CacheSerializableRunnable invokeStopLocator() { - return new CacheSerializableRunnable("execute: stopLocator") { - @Override - public void run2() { - try { - stopLocator(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - CacheSerializableRunnable invokeCloseCache() { - return new CacheSerializableRunnable("execute: closeCache") { - @Override - public void run2() { - try { - closeCache(LuceneSearchWithRollingUpgradeDUnit.cache); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - private CacheSerializableRunnable invokeRebalance() { - return new CacheSerializableRunnable("execute: rebalance") { - @Override - public void run2() { - try { - rebalance(LuceneSearchWithRollingUpgradeDUnit.cache); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - private void deleteDiskStores() { - try { - FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile()); - } catch (IOException e) { - throw new Error("Error deleting files", e); - } - } - - private static Object createCache(Properties systemProperties) throws Exception { - - Class distConfigClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl"); - boolean disableConfig = true; - try { - distConfigClass.getDeclaredField("useSharedConfiguration"); - } catch (NoSuchFieldException e) { - disableConfig = false; - } - if (disableConfig) { - systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false"); - } - - Class cacheFactoryClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.CacheFactory"); - Constructor constructor = cacheFactoryClass.getConstructor(Properties.class); - constructor.setAccessible(true); - Object cacheFactory = constructor.newInstance(systemProperties); - - Method createMethod = cacheFactoryClass.getMethod("create"); - createMethod.setAccessible(true); - Object cache = createMethod.invoke(cacheFactory); - return cache; - } - - private static Object getRegion(Object cache, String regionName) throws Exception { - return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName); - } - - private static Object put(Object cache, String regionName, Object key, Object value) - throws Exception { - Object region = getRegion(cache, regionName); - return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key, - value); - } - - private static void createRegion(Object cache, String regionName, String shortcutName) - throws Exception { - Class aClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.RegionShortcut"); - Object[] enumConstants = aClass.getEnumConstants(); - Object shortcut = null; - int length = enumConstants.length; - for (int i = 0; i < length; i++) { - Object constant = enumConstants[i]; - if (((Enum) constant).name().equals(shortcutName)) { - shortcut = constant; - break; - } - } - - Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass); - createRegionFactoryMethod.setAccessible(true); - Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut); - Method createMethod = regionFactory.getClass().getMethod("create", String.class); - createMethod.setAccessible(true); - createMethod.invoke(regionFactory, regionName); - } - - static void createLuceneIndex(Object cache, String regionName, String indexName) - throws Exception { - Class luceneServiceProvider = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider"); - Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class); - Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache); - Method createLuceneIndexFactoryMethod = - luceneService.getClass().getMethod("createIndexFactory"); - createLuceneIndexFactoryMethod.setAccessible(true); - Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService); - luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory, - "status"); - luceneIndexFactory.getClass().getMethod("create", String.class, String.class) - .invoke(luceneIndexFactory, indexName, regionName); - } - - static void createLuceneIndexOnExistingRegion(Object cache, String regionName, - String indexName) throws Exception { - Class luceneServiceProvider = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider"); - Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class); - Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache); - Method createLuceneIndexFactoryMethod = - luceneService.getClass().getMethod("createIndexFactory"); - createLuceneIndexFactoryMethod.setAccessible(true); - Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService); - luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory, - "status"); - luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class) - .invoke(luceneIndexFactory, indexName, regionName, true); - } - - private static void createPersistentPartitonedRegion(Object cache, String regionName, - File diskStore) throws Exception { - Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store"); - Class dataPolicyObject = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.cache.DataPolicy"); - Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null); - if (store == null) { - Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache); - dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L); - dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf, - new Object[] {new File[] {diskStore.getAbsoluteFile()}}); - dsf.getClass().getMethod("create", String.class).invoke(dsf, "store"); - } - Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache); - rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store"); - rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy); - rf.getClass().getMethod("create", String.class).invoke(rf, regionName); - } - - private static void assertVersion(Object cache, short ordinal) throws Exception { - Class idmClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember"); - Method getDSMethod = cache.getClass().getMethod("getDistributedSystem"); - getDSMethod.setAccessible(true); - Object ds = getDSMethod.invoke(cache); - - Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember"); - getDistributedMemberMethod.setAccessible(true); - Object member = getDistributedMemberMethod.invoke(ds); - Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject"); - getVersionObjectMethod.setAccessible(true); - Object thisVersion = getVersionObjectMethod.invoke(member); - Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal"); - getOrdinalMethod.setAccessible(true); - short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion); - if (ordinal != thisOrdinal) { - throw new Error( - "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal); - } - } - - private static void stopCacheServers(Object cache) throws Exception { - Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers"); - getCacheServersMethod.setAccessible(true); - List cacheServers = (List) getCacheServersMethod.invoke(cache); - Method stopMethod = null; - for (Object cs : cacheServers) { - if (stopMethod == null) { - stopMethod = cs.getClass().getMethod("stop"); - } - stopMethod.setAccessible(true); - stopMethod.invoke(cs); - } - } - - private static void closeCache(Object cache) throws Exception { - if (cache == null) { - return; - } - Method isClosedMethod = cache.getClass().getMethod("isClosed"); - isClosedMethod.setAccessible(true); - boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache); - if (cache != null && !cacheClosed) { - stopCacheServers(cache); - Method method = cache.getClass().getMethod("close"); - method.setAccessible(true); - method.invoke(cache); - long startTime = System.currentTimeMillis(); - while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) { - try { - Thread.sleep(1000); - Method cacheClosedMethod = cache.getClass().getMethod("isClosed"); - cacheClosedMethod.setAccessible(true); - cacheClosed = (Boolean) cacheClosedMethod.invoke(cache); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - - private static void rebalance(Object cache) throws Exception { - Method getRMMethod = cache.getClass().getMethod("getResourceManager"); - getRMMethod.setAccessible(true); - Object manager = getRMMethod.invoke(cache); - - Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory"); - createRebalanceFactoryMethod.setAccessible(true); - Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager); - Method m = rebalanceFactory.getClass().getMethod("start"); - m.setAccessible(true); - Object op = m.invoke(rebalanceFactory); - - // Wait until the rebalance is complete - try { - Method getResultsMethod = op.getClass().getMethod("getResults"); - getResultsMethod.setAccessible(true); - Object results = getResultsMethod.invoke(op); - Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime"); - getTotalTimeMethod.setAccessible(true); - System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n"); - Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes"); - getTotalBucketsMethod.setAccessible(true); - System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n"); - } catch (Exception e) { - Thread.currentThread().interrupt(); - throw e; - } - } - - /** - * Starts a locator with given configuration. - */ - private static void startLocator(final String serverHostName, final int port, - final String testName, final String locatorsString, final Properties props) throws Exception { - props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); - props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString); - Logger logger = LogService.getLogger(); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name()); - - InetAddress bindAddr; - try { - bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost())); - } catch (UnknownHostException uhe) { - throw new Error("While resolving bind address ", uhe); - } - - File logFile = new File(testName + "-locator" + port + ".log"); - Class locatorClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.distributed.Locator"); - Method startLocatorAndDSMethod = - locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class, - Properties.class, boolean.class, boolean.class, String.class); - startLocatorAndDSMethod.setAccessible(true); - startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null); - } - - private static void startLocator(final String serverHostName, final int port, Properties props) - throws Exception { - - - InetAddress bindAddr = null; - try { - bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost())); - } catch (UnknownHostException uhe) { - throw new Error("While resolving bind address ", uhe); - } - - Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null); - Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available - } - - private static void stopLocator() throws Exception { - Class internalLocatorClass = Thread.currentThread().getContextClassLoader() - .loadClass("org.apache.geode.distributed.internal.InternalLocator"); - Method locatorMethod = internalLocatorClass.getMethod("getLocator"); - locatorMethod.setAccessible(true); - Object locator = locatorMethod.invoke(null); - Method stopLocatorMethod = locator.getClass().getMethod("stop"); - stopLocatorMethod.setAccessible(true); - stopLocatorMethod.invoke(locator); - } - - /** - * Get the port that the standard dunit locator is listening on. - * - * @return locator address - */ - private static String getDUnitLocatorAddress() { - return Host.getHost(0).getHostName(); - } - } diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java similarity index 75% copy from geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java copy to geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java index 2a8cf9f..a748687 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java @@ -25,7 +25,6 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; @@ -33,11 +32,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.Logger; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.apache.geode.cache.GemFireCache; -import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionFactory; @@ -47,7 +43,6 @@ import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.serialization.Version; @@ -55,60 +50,23 @@ import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.Invoke; -import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.internal.DUnitLauncher; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; -import org.apache.geode.test.version.TestVersion; import org.apache.geode.test.version.VersionManager; -@RunWith(Parameterized.class) [email protected](CategoryWithParameterizedRunnerFactory.class) -public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase { +public abstract class LuceneSearchWithRollingUpgradeTestBase extends JUnit4DistributedTestCase { - - @Parameterized.Parameters(name = "from_v{0}, with reindex={1}") - public static Collection<Object[]> data() { - Collection<String> luceneVersions = getLuceneVersions(); - Collection<Object[]> rval = new ArrayList<>(); - luceneVersions.forEach(v -> { - rval.add(new Object[] {v, true}); - rval.add(new Object[] {v, false}); - }); - return rval; - } - - private static Collection<String> getLuceneVersions() { - List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent(); - // Lucene Compatibility checks start with Apache Geode v1.2.0 - // Removing the versions older than v1.2.0 - result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0); - if (result.size() < 1) { - throw new RuntimeException("No older versions of Geode were found to test against"); - } else { - System.out.println("running against these versions: " + result); - } - return result; - } - - private File[] testingDirs = new File[3]; + protected File[] testingDirs = new File[3]; protected static String INDEX_NAME = "index"; - private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit"; + protected static String diskDir = "LuceneSearchWithRollingUpgradeTestBase"; // Each vm will have a cache object protected static Object cache; - // the old version of Geode we're testing against - @Parameterized.Parameter() - public String oldVersion; - - @Parameterized.Parameter(1) - public Boolean reindex; - - private void deleteVMFiles() { + protected void deleteVMFiles() { System.out.println("deleting files in vm" + VM.getCurrentVMNum()); File pwd = new File("."); for (File entry : pwd.listFiles()) { @@ -124,7 +82,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } } - private void deleteWorkingDirFiles() { + protected void deleteWorkingDirFiles() { Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles()); } @@ -145,20 +103,24 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return props; } - VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut, - String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) { - VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled); + VM rollClientToCurrentAndCreateRegion(VM oldClient, + ClientRegionShortcut shortcut, + String regionName, String[] hostNames, int[] locatorPorts, + boolean subscriptionEnabled, boolean singleHopEnabled) { + VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled, + singleHopEnabled); // recreate region on "rolled" client invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient); return rollClient; } - private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts, - boolean subscriptionEnabled) { + protected VM rollClientToCurrent(VM oldClient, String[] hostNames, + int[] locatorPorts, + boolean subscriptionEnabled, boolean singleHopEnabled) { oldClient.invoke(invokeCloseCache()); VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId()); rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, - subscriptionEnabled)); + subscriptionEnabled, singleHopEnabled)); rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL)); return rollClient; } @@ -169,7 +131,8 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Override public void run2() { try { - createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, regionName, + createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache, + regionName, shortcut); } catch (Exception e) { fail("Error creating client region", e); @@ -178,7 +141,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu }; } - private static void createClientRegion(GemFireCache cache, String regionName, + protected static void createClientRegion(GemFireCache cache, String regionName, ClientRegionShortcut shortcut) { ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut); rf.create(regionName); @@ -189,7 +152,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Override public void run2() { try { - startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, port); + startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache, port); } catch (Exception e) { fail("Error creating cache", e); } @@ -197,20 +160,23 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu }; } - private static void startCacheServer(GemFireCache cache, int port) throws Exception { + protected static void startCacheServer(GemFireCache cache, int port) throws Exception { CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer(); cacheServer.setPort(port); cacheServer.start(); } - CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties, - final String[] hosts, final int[] ports, boolean subscriptionEnabled) { + CacheSerializableRunnable invokeCreateClientCache( + final Properties systemProperties, + final String[] hosts, final int[] ports, boolean subscriptionEnabled, + boolean singleHopEnabled) { return new CacheSerializableRunnable("execute: createClientCache") { @Override public void run2() { try { - LuceneSearchWithRollingUpgradeDUnit.cache = - createClientCache(systemProperties, hosts, ports, subscriptionEnabled); + LuceneSearchWithRollingUpgradeTestBase.cache = + createClientCache(systemProperties, hosts, ports, subscriptionEnabled, + singleHopEnabled); } catch (Exception e) { fail("Error creating client cache", e); } @@ -225,13 +191,15 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } - private static ClientCache createClientCache(Properties systemProperties, String[] hosts, - int[] ports, boolean subscriptionEnabled) { + protected static ClientCache createClientCache(Properties systemProperties, + String[] hosts, + int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) { ClientCacheFactory cf = new ClientCacheFactory(systemProperties); if (subscriptionEnabled) { cf.setPoolSubscriptionEnabled(true); cf.setPoolSubscriptionRedundancy(-1); } + cf.setPoolPRSingleHopEnabled(singleHopEnabled); int hostsLength = hosts.length; for (int i = 0; i < hostsLength; i++) { cf.addPoolLocator(hosts[i], ports[i]); @@ -240,105 +208,6 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return cf.create(); } - // We start an "old" locator and old servers - // We roll the locator - // Now we roll all the servers from old to new - void executeLuceneQueryWithServerRollOvers(String regionType, String startingVersion) - throws Exception { - final Host host = Host.getHost(0); - VM server1 = host.getVM(startingVersion, 0); - VM server2 = host.getVM(startingVersion, 1); - VM server3 = host.getVM(startingVersion, 2); - VM locator = host.getVM(startingVersion, 3); - - - String regionName = "aRegion"; - String shortcutName = null; - if ((regionType.equals("partitionedRedundant"))) { - shortcutName = RegionShortcut.PARTITION_REDUNDANT.name(); - } else if ((regionType.equals("persistentPartitioned"))) { - shortcutName = RegionShortcut.PARTITION_PERSISTENT.name(); - for (int i = 0; i < testingDirs.length; i++) { - testingDirs[i] = new File(diskDir, "diskStoreVM_" + String.valueOf(host.getVM(i).getId())) - .getAbsoluteFile(); - if (!testingDirs[i].exists()) { - System.out.println(" Creating diskdir for server: " + i); - testingDirs[i].mkdirs(); - } - } - } - - int[] locatorPorts = AvailablePortHelper.getRandomAvailableTCPPorts(1); - String hostName = NetworkUtils.getServerHostName(host); - String locatorString = getLocatorString(locatorPorts); - final Properties locatorProps = new Properties(); - // configure all class loaders for each vm - - try { - locator.invoke(invokeStartLocator(hostName, locatorPorts[0], getTestMethodName(), - locatorString, locatorProps)); - invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)), server1, server2, - server3); - - // Create Lucene Index - server1.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); - server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); - server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); - - // create region - if ((regionType.equals("persistentPartitioned"))) { - for (int i = 0; i < testingDirs.length; i++) { - CacheSerializableRunnable runnable = - invokeCreatePersistentPartitionedRegion(regionName, testingDirs[i]); - invokeRunnableInVMs(runnable, host.getVM(i)); - } - } else { - invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), server1, server2, - server3); - } - int expectedRegionSize = 10; - putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 0, - 10, server2, server3); - locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(), - locatorString); - - server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, - testingDirs[0], shortcutName, regionName, locatorPorts); - verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server1); - expectedRegionSize += 5; - putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 5, - 15, server2, server3); - expectedRegionSize += 5; - putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 10, - 20, server1, server3); - - server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, - testingDirs[1], shortcutName, regionName, locatorPorts); - verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server2); - expectedRegionSize += 5; - putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15, - 25, server1, server3); - expectedRegionSize += 5; - putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 20, - 30, server2, server3); - - server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType, - testingDirs[2], shortcutName, regionName, locatorPorts); - verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server3); - putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 15, - 25, server1, server2); - putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 20, - 30, server1, server2, server3); - - - } finally { - invokeRunnableInVMs(true, invokeStopLocator(), locator); - invokeRunnableInVMs(true, invokeCloseCache(), server1, server2, server3); - if ((regionType.equals("persistentPartitioned"))) { - deleteDiskStores(); - } - } - } void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName, int expectedRegionSize, int start, int end, VM... vms) throws Exception { @@ -399,7 +268,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu resultsActive.size() + resultsInactive.size()); } - private Collection executeLuceneQuery(Object luceneQuery) + protected Collection executeLuceneQuery(Object luceneQuery) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { Collection results = null; int retryCount = 10; @@ -420,7 +289,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } - private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize, + protected void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize, VM... vms) { for (VM vm : vms) { vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize)); @@ -459,15 +328,18 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return rollServer; } - VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, String regionType, - File diskdir, String shortcutName, String regionName, int[] locatorPorts) { + VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, + String regionType, + File diskdir, String shortcutName, String regionName, int[] locatorPorts, + boolean reindex) { VM rollServer = rollServerToCurrent(oldServer, locatorPorts); return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName, - rollServer); + rollServer, reindex); } - private VM createLuceneIndexAndRegionOnRolledServer(String regionType, File diskdir, - String shortcutName, String regionName, VM rollServer) { + private VM createLuceneIndexAndRegionOnRolledServer(String regionType, + File diskdir, + String shortcutName, String regionName, VM rollServer, boolean reindex) { Boolean serializeIt = reindex; rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt); @@ -555,7 +427,8 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return locatorString.toString(); } - private CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port, + protected CacheSerializableRunnable invokeStartLocator(final String serverHostName, + final int port, final String testName, final String locatorsString, final Properties props) { return new CacheSerializableRunnable("execute: startLocator") { @Override @@ -588,7 +461,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Override public void run2() { try { - LuceneSearchWithRollingUpgradeDUnit.cache = createCache(systemProperties); + LuceneSearchWithRollingUpgradeTestBase.cache = createCache(systemProperties); } catch (Exception e) { throw new RuntimeException(e); } @@ -601,7 +474,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Override public void run2() { try { - assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache, version); + assertVersion(LuceneSearchWithRollingUpgradeTestBase.cache, version); } catch (Exception e) { throw new RuntimeException(e); } @@ -615,7 +488,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Override public void run2() { try { - createRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, shortcutName); + createRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, shortcutName); } catch (Exception e) { throw new RuntimeException(e); } @@ -623,13 +496,14 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu }; } - private CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(final String regionName, + protected CacheSerializableRunnable invokeCreatePersistentPartitionedRegion( + final String regionName, final File diskstore) { return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") { @Override public void run2() { try { - createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, + createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, diskstore); } catch (Exception e) { throw new RuntimeException(e); @@ -638,13 +512,13 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu }; } - private CacheSerializableRunnable invokePut(final String regionName, final Object key, + protected CacheSerializableRunnable invokePut(final String regionName, final Object key, final Object value) { return new CacheSerializableRunnable("execute: put") { @Override public void run2() { try { - put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key, value); + put(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, key, value); } catch (Exception e) { throw new RuntimeException(e); } @@ -670,7 +544,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Override public void run2() { try { - closeCache(LuceneSearchWithRollingUpgradeDUnit.cache); + closeCache(LuceneSearchWithRollingUpgradeTestBase.cache); } catch (Exception e) { throw new RuntimeException(e); } @@ -678,12 +552,12 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu }; } - private CacheSerializableRunnable invokeRebalance() { + protected CacheSerializableRunnable invokeRebalance() { return new CacheSerializableRunnable("execute: rebalance") { @Override public void run2() { try { - rebalance(LuceneSearchWithRollingUpgradeDUnit.cache); + rebalance(LuceneSearchWithRollingUpgradeTestBase.cache); } catch (Exception e) { throw new RuntimeException(e); } @@ -691,7 +565,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu }; } - private void deleteDiskStores() { + protected void deleteDiskStores() { try { FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile()); } catch (IOException e) { @@ -699,7 +573,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } } - private static Object createCache(Properties systemProperties) throws Exception { + protected static Object createCache(Properties systemProperties) throws Exception { Class distConfigClass = Thread.currentThread().getContextClassLoader() .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl"); @@ -725,18 +599,18 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return cache; } - private static Object getRegion(Object cache, String regionName) throws Exception { + protected static Object getRegion(Object cache, String regionName) throws Exception { return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName); } - private static Object put(Object cache, String regionName, Object key, Object value) + protected static Object put(Object cache, String regionName, Object key, Object value) throws Exception { Object region = getRegion(cache, regionName); return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key, value); } - private static void createRegion(Object cache, String regionName, String shortcutName) + protected static void createRegion(Object cache, String regionName, String shortcutName) throws Exception { Class aClass = Thread.currentThread().getContextClassLoader() .loadClass("org.apache.geode.cache.RegionShortcut"); @@ -791,7 +665,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu .invoke(luceneIndexFactory, indexName, regionName, true); } - private static void createPersistentPartitonedRegion(Object cache, String regionName, + protected static void createPersistentPartitonedRegion(Object cache, String regionName, File diskStore) throws Exception { Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store"); Class dataPolicyObject = Thread.currentThread().getContextClassLoader() @@ -810,7 +684,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu rf.getClass().getMethod("create", String.class).invoke(rf, regionName); } - private static void assertVersion(Object cache, short ordinal) throws Exception { + protected static void assertVersion(Object cache, short ordinal) throws Exception { Class idmClass = Thread.currentThread().getContextClassLoader() .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember"); Method getDSMethod = cache.getClass().getMethod("getDistributedSystem"); @@ -832,7 +706,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } } - private static void stopCacheServers(Object cache) throws Exception { + protected static void stopCacheServers(Object cache) throws Exception { Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers"); getCacheServersMethod.setAccessible(true); List cacheServers = (List) getCacheServersMethod.invoke(cache); @@ -846,7 +720,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } } - private static void closeCache(Object cache) throws Exception { + protected static void closeCache(Object cache) throws Exception { if (cache == null) { return; } @@ -872,7 +746,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } } - private static void rebalance(Object cache) throws Exception { + protected static void rebalance(Object cache) throws Exception { Method getRMMethod = cache.getClass().getMethod("getResourceManager"); getRMMethod.setAccessible(true); Object manager = getRMMethod.invoke(cache); @@ -904,7 +778,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu /** * Starts a locator with given configuration. */ - private static void startLocator(final String serverHostName, final int port, + protected static void startLocator(final String serverHostName, final int port, final String testName, final String locatorsString, final Properties props) throws Exception { props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString); @@ -928,7 +802,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null); } - private static void startLocator(final String serverHostName, final int port, Properties props) + protected static void startLocator(final String serverHostName, final int port, Properties props) throws Exception { @@ -943,7 +817,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available } - private static void stopLocator() throws Exception { + protected static void stopLocator() throws Exception { Class internalLocatorClass = Thread.currentThread().getContextClassLoader() .loadClass("org.apache.geode.distributed.internal.InternalLocator"); Method locatorMethod = internalLocatorClass.getMethod("getLocator"); @@ -959,7 +833,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu * * @return locator address */ - private static String getDUnitLocatorAddress() { + protected static String getDUnitLocatorAddress() { return Host.getHost(0).getHostName(); } diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java index cb68055..8178ea6 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java @@ -84,7 +84,7 @@ public class RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServe locatorString); server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null, - shortcut.name(), regionName, locatorPorts); + shortcut.name(), regionName, locatorPorts, reindex); expectedRegionSize += 10; putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15, 25, server2); @@ -97,7 +97,7 @@ public class RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServe 30, server1); server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null, - shortcut.name(), regionName, locatorPorts); + shortcut.name(), regionName, locatorPorts, reindex); expectedRegionSize += 5; putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 25, 35, server1, server2); diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java similarity index 81% copy from geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java copy to geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java index 3acba6c..368305a 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java @@ -17,7 +17,6 @@ package org.apache.geode.cache.lucene; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.junit.Assert.assertTrue; -import org.junit.Ignore; import org.junit.Test; import org.apache.geode.cache.RegionShortcut; @@ -28,31 +27,40 @@ import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.version.VersionManager; -public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated - extends LuceneSearchWithRollingUpgradeDUnit { +public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion + extends LuceneSearchWithRollingUpgradeTestBase { - @Ignore("Disabled until GEODE-7258 is fixed") @Test - public void test() + public void testSingleHopReindex() throws Exception { + doTest(true, true); + } + + @Test + public void testNoSingleHopReindex() throws Exception { + doTest(false, true); + } + + @Test + public void testNoSingleHopNoReindex() throws Exception { + doTest(false, false); + } + + @Test + public void testSingleHopNoReindex() throws Exception { + doTest(true, false); + } + + private void doTest(boolean singleHopEnabled, boolean reindex) throws Exception { - // This test verifies the upgrade from lucene 6 to 7 doesn't cause any issues. Without any - // changes to accomodate this upgrade, this test will fail with an IndexFormatTooNewException. - // - // The main sequence in this test that causes the failure is: - // - // - start two servers with old version using Lucene 6 - // - roll one server to new version server using Lucene 7 - // - do puts into primary buckets in new server which creates entries in the fileAndChunk region - // with Lucene 7 format - // - stop the new version server which causes the old version server to become primary for those - // buckets - // - do a query which causes the IndexFormatTooNewException to be thrown + // Since the changes relating to GEODE-7258 is not applied on 1.10.0, + // use this test to roll from develop to develop to verify. final Host host = Host.getHost(0); - VM locator = host.getVM(oldVersion, 0); - VM server1 = host.getVM(oldVersion, 1); - VM server2 = host.getVM(oldVersion, 2); - VM client = host.getVM(oldVersion, 3); + VM locator = host.getVM(VersionManager.CURRENT_VERSION, 0); + VM server1 = host.getVM(VersionManager.CURRENT_VERSION, 1); + VM server2 = host.getVM(VersionManager.CURRENT_VERSION, 2); + VM client = host.getVM(VersionManager.CURRENT_VERSION, 3); final String regionName = "aRegion"; String regionType = "partitionedRedundant"; @@ -85,7 +93,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1); invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2); invokeRunnableInVMs( - invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false), + invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false, + singleHopEnabled), client); // Create the index on the servers @@ -107,7 +116,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(), locatorString); server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null, - shortcut.name(), regionName, locatorPorts); + shortcut.name(), regionName, locatorPorts, reindex); // Execute a query on the client and verify the results. This also waits until flushed. client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects)); diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java index 95c0498..df1e329 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java @@ -70,7 +70,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3); invokeRunnableInVMs( - invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false), + invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false, + singleHopEnabled), client); server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); @@ -87,7 +88,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol locatorString); server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType, null, - shortcut.name(), regionName, locatorPorts); + shortcut.name(), regionName, locatorPorts, reindex); invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3); expectedRegionSize += 10; putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 20, @@ -97,7 +98,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol 40, server2); server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null, - shortcut.name(), regionName, locatorPorts); + shortcut.name(), regionName, locatorPorts, reindex); invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server2); expectedRegionSize += 10; putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 40, @@ -107,7 +108,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol 60, server3); client = rollClientToCurrentAndCreateRegion(client, ClientRegionShortcut.PROXY, regionName, - hostNames, locatorPorts, false); + hostNames, locatorPorts, false, singleHopEnabled); expectedRegionSize += 10; putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 60, 70, server2, server3); diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java index 3acba6c..430e8fa 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java @@ -17,7 +17,6 @@ package org.apache.geode.cache.lucene; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.junit.Assert.assertTrue; -import org.junit.Ignore; import org.junit.Test; import org.apache.geode.cache.RegionShortcut; @@ -32,7 +31,6 @@ import org.apache.geode.test.dunit.VM; public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated extends LuceneSearchWithRollingUpgradeDUnit { - @Ignore("Disabled until GEODE-7258 is fixed") @Test public void test() throws Exception { @@ -85,7 +83,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1); invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2); invokeRunnableInVMs( - invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false), + invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false, + singleHopEnabled), client); // Create the index on the servers @@ -107,7 +106,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(), locatorString); server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null, - shortcut.name(), regionName, locatorPorts); + shortcut.name(), regionName, locatorPorts, reindex); // Execute a query on the client and verify the results. This also waits until flushed. client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
