[
https://issues.apache.org/jira/browse/GEODE-3308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110085#comment-16110085
]
ASF GitHub Bot commented on GEODE-3308:
---------------------------------------
Github user jhuynh1 commented on a diff in the pull request:
https://github.com/apache/geode/pull/659#discussion_r130765402
--- Diff:
geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
---
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for
additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
ANY KIND, either express
+ * or implied. See the License for the specific language governing
permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import static org.apache.geode.test.dunit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.GemFireCache;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.standalone.DUnitLauncher;
+import org.apache.geode.test.dunit.standalone.VersionManager;
+import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import
org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+
+@Category({DistributedTest.class, BackwardCompatibilityTest.class})
+@RunWith(Parameterized.class)
[email protected](CategoryWithParameterizedRunnerFactory.class)
+public class LuceneSearchWithRollingUpgradeDUnit extends
JUnit4DistributedTestCase {
+
+
+ @Parameterized.Parameters
+ public static Collection<String> data() {
+ List<String> result =
VersionManager.getInstance().getVersionsWithoutCurrent();
+ // Lucene Compatibility checks start with Apache Geode v1.2.0
+ // Removing the versions older than v1.2.0
+ result.removeIf(s -> Integer.parseInt(s) < 120);
+ if (result.size() < 1) {
+ throw new RuntimeException("No older versions of Geode were found to
test against");
+ } else {
+ System.out.println("running against these versions: " + result);
+ }
+ return result;
+ }
+
+ private File[] testingDirs = new File[3];
+
+ private static String INDEX_NAME = "index";
+
+ private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit";
+
+ // Each vm will have a cache object
+ private static Object cache;
+
+ // the old version of Geode we're testing against
+ private String oldVersion;
+
+ private void deleteVMFiles() throws Exception {
+ System.out.println("deleting files in vm" + VM.getCurrentVMNum());
+ File pwd = new File(".");
+ for (File entry : pwd.listFiles()) {
+ try {
+ if (entry.isDirectory()) {
+ FileUtils.deleteDirectory(entry);
+ } else {
+ entry.delete();
+ }
+ } catch (Exception e) {
+ System.out.println("Could not delete " + entry + ": " +
e.getMessage());
+ }
+ }
+ }
+
+ private void deleteWorkingDirFiles() throws Exception {
+ Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
+ }
+
+ @Override
+ public void postSetUp() throws Exception {
+ deleteWorkingDirFiles();
+ IgnoredException.addIgnoredException(
+ "cluster configuration service not
available|ConflictingPersistentDataException");
+ }
+
+ public LuceneSearchWithRollingUpgradeDUnit(String version) {
+ oldVersion = version;
+ }
+
+ @Test
+ public void
luceneQueryReturnsCorrectResultsAfterServersRollOverOnPartitionRegion()
+ throws Exception {
+ executeLuceneQueryWithServerRollOvers("partitionedRedundant",
oldVersion);
+ }
+
+ @Test
+ public void
luceneQueryReturnsCorrectResultsAfterServersRollOverOnPersistentPartitionRegion()
+ throws Exception {
+ executeLuceneQueryWithServerRollOvers("persistentPartitioned",
oldVersion);
+ }
+
+ // 2 locator, 2 servers
+ @Test
+ public void
luceneQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled()
+ throws Exception {
+ final Host host = Host.getHost(0);
+ VM locator1 = host.getVM(oldVersion, 0);
+ VM locator2 = host.getVM(oldVersion, 1);
+ VM server1 = host.getVM(oldVersion, 2);
+ VM server2 = host.getVM(oldVersion, 3);
+
+ final String regionName = "aRegion";
+ RegionShortcut shortcut = RegionShortcut.PARTITION_REDUNDANT;
+ String regionType = "partitionedRedundant";
+
+ int[] locatorPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+ DistributedTestUtils.deleteLocatorStateFile(locatorPorts);
+
+ String hostName = NetworkUtils.getServerHostName(host);
+ String locatorString = getLocatorString(locatorPorts);
+ try {
+ locator1.invoke(
+ invokeStartLocator(hostName, locatorPorts[0],
getLocatorPropertiesPre91(locatorString)));
+ locator2.invoke(
+ invokeStartLocator(hostName, locatorPorts[1],
getLocatorPropertiesPre91(locatorString)));
+
invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)),
server1, server2);
+
+ server1.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+ server2.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcut.name()),
server1, server2);
+ int expectedRegionSize = 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName,
expectedRegionSize, 0,
+ 10, server1, server2);
+ locator1 = rollLocatorToCurrent(locator1, hostName, locatorPorts[0],
getTestMethodName(),
+ locatorString);
+
+ locator2 = rollLocatorToCurrent(locator2, hostName, locatorPorts[1],
getTestMethodName(),
+ locatorString);
+
+ server1 = rollServerToCurrentAndCreateRegion(server1, regionType,
null, shortcut.name(),
+ regionName, locatorPorts);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName,
expectedRegionSize, 15,
+ 25, server1, server2);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName,
expectedRegionSize, 20,
+ 30, server1, server2);
+
+ server2 = rollServerToCurrentAndCreateRegion(server2, regionType,
null, shortcut.name(),
+ regionName, locatorPorts);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName,
expectedRegionSize, 25,
+ 35, server1, server2);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName,
expectedRegionSize, 30,
+ 40, server1, server2);
+
+ } finally {
+ invokeRunnableInVMs(true, invokeStopLocator(), locator1, locator2);
+ invokeRunnableInVMs(true, invokeCloseCache(), server1, server2);
+ }
+ }
+
+
+ public Properties getLocatorPropertiesPre91(String locatorsString) {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME,
DUnitLauncher.logLevel);
+
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
+ return props;
+ }
+
+ @Test
+ public void
luceneQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver()
+ throws Exception {
+ final Host host = Host.getHost(0);
+ VM locator = host.getVM(oldVersion, 0);
+ VM server2 = host.getVM(oldVersion, 1);
+ VM server3 = host.getVM(oldVersion, 2);
+ VM client = host.getVM(oldVersion, 3);
+
+ final String regionName = "aRegion";
+ String regionType = "partitionedRedundant";
+ RegionShortcut shortcut = RegionShortcut.PARTITION_REDUNDANT;
+
+ int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+ int[] locatorPorts = new int[] {ports[0]};
+ int[] csPorts = new int[] {ports[1], ports[2]};
+
+ DistributedTestUtils.deleteLocatorStateFile(locatorPorts);
+
+ String hostName = NetworkUtils.getServerHostName(host);
+ String[] hostNames = new String[] {hostName};
+ String locatorString = getLocatorString(locatorPorts);
+ try {
+ locator.invoke(
+ invokeStartLocator(hostName, locatorPorts[0],
getLocatorPropertiesPre91(locatorString)));
+
invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)),
server2, server3);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server2);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
+
+ invokeRunnableInVMs(
+ invokeCreateClientCache(getClientSystemProperties(), hostNames,
locatorPorts, false),
+ client);
+ server2.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+ server3.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcut.name()),
server2, server3);
+ invokeRunnableInVMs(invokeCreateClientRegion(regionName,
ClientRegionShortcut.PROXY), client);
+ int expectedRegionSize = 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(client, regionName,
expectedRegionSize, 0, 10,
+ server3);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName,
expectedRegionSize, 10,
+ 20, server2);
+ locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0],
getTestMethodName(),
+ locatorString);
+
+ server3 = rollServerToCurrentAndCreateRegion(server3, regionType,
null, shortcut.name(),
+ regionName, locatorPorts);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(client, regionName,
expectedRegionSize, 20,
+ 30, server3, server2);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName,
expectedRegionSize, 30,
+ 40, server2);
+
+ server2 = rollServerToCurrentAndCreateRegion(server2, regionType,
null, shortcut.name(),
+ regionName, locatorPorts);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server2);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(client, regionName,
expectedRegionSize, 40,
+ 50, server2, server3);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName,
expectedRegionSize, 50,
+ 60, server3);
+
+ client = rollClientToCurrentAndCreateRegion(client,
ClientRegionShortcut.PROXY, regionName,
+ hostNames, locatorPorts, false);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(client, regionName,
expectedRegionSize, 60,
+ 70, server2, server3);
+ expectedRegionSize += 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName,
expectedRegionSize, 70,
+ 80, server3);
+
+ } finally {
+ invokeRunnableInVMs(true, invokeStopLocator(), locator);
+ invokeRunnableInVMs(true, invokeCloseCache(), server2, server3,
client);
+ }
+ }
+
+ private VM rollClientToCurrentAndCreateRegion(VM oldClient,
ClientRegionShortcut shortcut,
+ String regionName, String[] hostNames, int[] locatorPorts, boolean
subscriptionEnabled)
+ throws Exception {
+ VM rollClient = rollClientToCurrent(oldClient, hostNames,
locatorPorts, subscriptionEnabled);
+ // recreate region on "rolled" client
+ invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut),
rollClient);
+ return rollClient;
+ }
+
+ private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[]
locatorPorts,
+ boolean subscriptionEnabled) throws Exception {
+ oldClient.invoke(invokeCloseCache());
+ VM rollClient = Host.getHost(0).getVM(oldClient.getPid());
+ rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(),
hostNames, locatorPorts,
+ subscriptionEnabled));
+ rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
+ return rollClient;
+ }
+
+ private CacheSerializableRunnable invokeCreateClientRegion(final String
regionName,
+ final ClientRegionShortcut shortcut) {
+ return new CacheSerializableRunnable("execute: createClientRegion") {
+ public void run2() {
+ try {
+ createClientRegion((GemFireCache)
LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
+ shortcut);
+ } catch (Exception e) {
+ fail("Error creating client region", e);
+ }
+ }
+ };
+ }
+
+ public static void createClientRegion(GemFireCache cache, String
regionName,
+ ClientRegionShortcut shortcut) throws Exception {
+ ClientRegionFactory rf = ((ClientCache)
cache).createClientRegionFactory(shortcut);
+ rf.create(regionName);
+ }
+
+ private CacheSerializableRunnable invokeStartCacheServer(final int port)
{
+ return new CacheSerializableRunnable("execute: startCacheServer") {
+ public void run2() {
+ try {
+ startCacheServer((GemFireCache)
LuceneSearchWithRollingUpgradeDUnit.cache, port);
+ } catch (Exception e) {
+ fail("Error creating cache", e);
+ }
+ }
+ };
+ }
+
+ public static void startCacheServer(GemFireCache cache, int port) throws
Exception {
+ CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
+ cacheServer.setPort(port);
+ cacheServer.start();
+ }
+
+ private CacheSerializableRunnable invokeCreateClientCache(final
Properties systemProperties,
+ final String[] hosts, final int[] ports, boolean
subscriptionEnabled) {
+ return new CacheSerializableRunnable("execute: createClientCache") {
+ public void run2() {
+ try {
+ LuceneSearchWithRollingUpgradeDUnit.cache =
+ createClientCache(systemProperties, hosts, ports,
subscriptionEnabled);
+ } catch (Exception e) {
+ fail("Error creating client cache", e);
+ }
+ }
+ };
+ }
+
+ public Properties getClientSystemProperties() {
+ Properties p = new Properties();
+ p.setProperty("mcast-port", "0");
+ return p;
+ }
+
+
+ public static ClientCache createClientCache(Properties systemProperties,
String[] hosts,
+ int[] ports, boolean subscriptionEnabled) throws Exception {
+ ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
+ if (subscriptionEnabled) {
+ cf.setPoolSubscriptionEnabled(true);
+ cf.setPoolSubscriptionRedundancy(-1);
+ }
+ int hostsLength = hosts.length;
+ for (int i = 0; i < hostsLength; i++) {
+ cf.addPoolLocator(hosts[i], ports[i]);
+ }
+
+ return cf.create();
+ }
+
+
+
+ // We start an "old" locator and old servers
+ // We roll the locator
+ // Now we roll all the servers from old to new
+ public void executeLuceneQueryWithServerRollOvers(String regionType,
String startingVersion)
+ throws Exception {
+ final Host host = Host.getHost(0);
+ VM server1 = host.getVM(startingVersion, 0);
+ VM server2 = host.getVM(startingVersion, 1);
+ VM server3 = host.getVM(startingVersion, 2);
+ VM locator = host.getVM(startingVersion, 3);
+
+
+ String regionName = "aRegion";
+ String shortcutName = null;
+ if ((regionType.equals("partitionedRedundant"))) {
+ shortcutName = RegionShortcut.PARTITION_REDUNDANT.name();
+ } else if ((regionType.equals("persistentPartitioned"))) {
+ shortcutName = RegionShortcut.PARTITION_PERSISTENT.name();
+ for (int i = 0; i < testingDirs.length; i++) {
+ testingDirs[i] = new File(diskDir, "diskStoreVM_" +
String.valueOf(host.getVM(i).getPid()))
+ .getAbsoluteFile();
+ if (!testingDirs[i].exists()) {
+ System.out.println(" Creating diskdir for server: " + i);
+ testingDirs[i].mkdirs();
+ }
+ }
+ }
+
+ int[] locatorPorts = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+ String hostName = NetworkUtils.getServerHostName(host);
+ String locatorString = getLocatorString(locatorPorts);
+ final Properties locatorProps = new Properties();
+ // configure all class loaders for each vm
+
+ try {
+ locator.invoke(invokeStartLocator(hostName, locatorPorts[0],
getTestMethodName(),
+ locatorString, locatorProps));
+
invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)),
server1, server2,
+ server3);
+
+ // Create Lucene Index
+ server1.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+ server2.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+ server3.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+
+ // create region
+ if ((regionType.equals("persistentPartitioned"))) {
+ for (int i = 0; i < testingDirs.length; i++) {
+ CacheSerializableRunnable runnable =
+ invokeCreatePersistentPartitionedRegion(regionName,
testingDirs[i]);
+ invokeRunnableInVMs(runnable, host.getVM(i));
+ }
+ } else {
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName),
server1, server2,
+ server3);
+ }
+ int expectedRegionSize = 10;
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName,
expectedRegionSize, 0,
+ 10, server2, server3);
+ locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0],
getTestMethodName(),
+ locatorString);
+
+ server1 = rollServerToCurrentAndCreateRegion(server1, regionType,
testingDirs[0],
+ shortcutName, regionName, locatorPorts);
+ verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize,
server1);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName,
expectedRegionSize, 5,
+ 15, server2, server3);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName,
expectedRegionSize, 10,
+ 20, server1, server3);
+
+ server2 = rollServerToCurrentAndCreateRegion(server2, regionType,
testingDirs[1],
+ shortcutName, regionName, locatorPorts);
+ verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize,
server2);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName,
expectedRegionSize, 15,
+ 25, server1, server3);
+ expectedRegionSize += 5;
+ putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName,
expectedRegionSize, 20,
+ 30, server2, server3);
+
+ server3 = rollServerToCurrentAndCreateRegion(server3, regionType,
testingDirs[2],
+ shortcutName, regionName, locatorPorts);
+ verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize,
server3);
+ putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName,
expectedRegionSize, 15,
+ 25, server1, server2);
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName,
expectedRegionSize, 20,
+ 30, server1, server2, server3);
+
+
+ } finally {
+ invokeRunnableInVMs(true, invokeStopLocator(), locator);
+ invokeRunnableInVMs(true, invokeCloseCache(), server1, server2,
server3);
+ if ((regionType.equals("persistentPartitioned"))) {
+ deleteDiskStores();
+ }
+ }
+ }
+
+ private void putSerializableObjectAndVerifyLuceneQueryResult(VM putter,
String regionName,
+ int expectedRegionSize, int start, int end, VM... vms) throws
Exception {
+ for (int i = start; i < end; i++) {
+ Class aClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.query.data.Portfolio");
+ Constructor portfolioConstructor = aClass.getConstructor(int.class);
+ Object serializableObject = portfolioConstructor.newInstance(i);
+ putter.invoke(invokePut(regionName, "" + i, serializableObject));
+ }
+ // verify present in others
+ verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
+ }
+
+ private void waitForRegionToHaveExpectedSize(String regionName, int
expectedRegionSize) {
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+ try {
+ Object region =
+ cache.getClass().getMethod("getRegion",
String.class).invoke(cache, regionName);
+ int regionSize = (int)
region.getClass().getMethod("size").invoke(region);
+ assertEquals("Region size not as expected after 60 seconds",
expectedRegionSize,
+ regionSize);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+
+ });
+ }
+
+ private void verifyLuceneQueryResults(String regionName, int
expectedRegionSize)
+ throws Exception {
+ Class luceneServiceProvider =
Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+ Method getLuceneService = luceneServiceProvider.getMethod("get",
GemFireCache.class);
+ Object luceneService = getLuceneService.invoke(luceneServiceProvider,
cache);
+ luceneService.getClass()
+ .getMethod("waitUntilFlushed", String.class, String.class,
long.class, TimeUnit.class)
+ .invoke(luceneService, INDEX_NAME, regionName, 60,
TimeUnit.SECONDS);
+ Method createLuceneQueryFactoryMethod =
+ luceneService.getClass().getMethod("createLuceneQueryFactory");
+ createLuceneQueryFactoryMethod.setAccessible(true);
+ Object luceneQueryFactory =
createLuceneQueryFactoryMethod.invoke(luceneService);
+ Object luceneQuery = luceneQueryFactory.getClass()
+ .getMethod("create", String.class, String.class, String.class,
String.class)
+ .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active",
"status");
+
+ Collection resultsActive =
+ (Collection)
luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+
+ luceneQuery = luceneQueryFactory.getClass()
+ .getMethod("create", String.class, String.class, String.class,
String.class)
+ .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive",
"status");
+
+ Collection resultsInactive =
+ (Collection)
luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+
+ assertEquals("Result size not as expected ", expectedRegionSize,
+ resultsActive.size() + resultsInactive.size());
+ }
+
+ private void verifyLuceneQueryResultInEachVM(String regionName, int
expectedRegionSize,
+ VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName,
expectedRegionSize));
+ vm.invoke(() -> verifyLuceneQueryResults(regionName,
expectedRegionSize));
+ }
+
+ }
+
+ private void invokeRunnableInVMs(CacheSerializableRunnable runnable,
VM... vms) throws Exception {
+ for (VM vm : vms) {
+ vm.invoke(runnable);
+ }
+ }
+
+ // Used to close cache and make sure we attempt on all vms even if some
do not have a cache
+ private void invokeRunnableInVMs(boolean catchErrors,
CacheSerializableRunnable runnable,
+ VM... vms) throws Exception {
+ for (VM vm : vms) {
+ try {
+ vm.invoke(runnable);
+ } catch (Exception e) {
+ if (!catchErrors) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) throws
Exception {
+ // Roll the server
+ oldServer.invoke(invokeCloseCache());
+ VM rollServer = Host.getHost(0).getVM(oldServer.getPid()); // gets a
vm with the current version
+ rollServer.invoke(invokeCreateCache(locatorPorts == null ?
getSystemPropertiesPost71()
+ : getSystemPropertiesPost71(locatorPorts)));
+ rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
+ return rollServer;
+ }
+
+ private VM rollServerToCurrentAndCreateRegion(VM oldServer, String
regionType, File diskdir,
+ String shortcutName, String regionName, int[] locatorPorts) throws
Exception {
+ VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
+ rollServer.invoke(() -> createLuceneIndex(cache, regionName,
INDEX_NAME));
+ // recreate region on "rolled" server
+ if ((regionType.equals("persistentPartitioned"))) {
+ CacheSerializableRunnable runnable =
+ invokeCreatePersistentPartitionedRegion(regionName, diskdir);
+ invokeRunnableInVMs(runnable, rollServer);
+ } else {
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName),
rollServer);
+ }
+ rollServer.invoke(invokeRebalance());
+ return rollServer;
+ }
+
+ private VM rollLocatorToCurrent(VM oldLocator, final String
serverHostName, final int port,
+ final String testName, final String locatorString) throws Exception {
+ // Roll the locator
+ oldLocator.invoke(invokeStopLocator());
+ VM rollLocator = Host.getHost(0).getVM(oldLocator.getPid()); // gets a
VM with current version
+ final Properties props = new Properties();
+
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME,
"false");
+ rollLocator.invoke(invokeStartLocator(serverHostName, port, testName,
locatorString, props));
+ return rollLocator;
+ }
+
+ // Due to licensing changes
+ public Properties getSystemPropertiesPost71() {
+ Properties props = getSystemProperties();
+ return props;
+ }
+
+ // Due to licensing changes
+ public Properties getSystemPropertiesPost71(int[] locatorPorts) {
+ Properties props = getSystemProperties(locatorPorts);
+ return props;
+ }
+
+ public Properties getSystemProperties() {
+ Properties props =
DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
+ props.remove("disable-auto-reconnect");
+ props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
+ props.remove(DistributionConfig.LOCK_MEMORY_NAME);
+ return props;
+ }
+
+ public Properties getSystemProperties(int[] locatorPorts) {
+ Properties p = new Properties();
+ String locatorString = getLocatorString(locatorPorts);
+ p.setProperty("locators", locatorString);
+ p.setProperty("mcast-port", "0");
+ return p;
+ }
+
+ public static String getLocatorString(int locatorPort) {
+ String locatorString = getDUnitLocatorAddress() + "[" + locatorPort +
"]";
+ return locatorString;
+ }
+
+ public static String getLocatorString(int[] locatorPorts) {
+ StringBuilder locatorString = new StringBuilder();
+ int numLocators = locatorPorts.length;
+ for (int i = 0; i < numLocators; i++) {
+ locatorString.append(getLocatorString(locatorPorts[i]));
+ if (i + 1 < numLocators) {
+ locatorString.append(",");
+ }
+ }
+ return locatorString.toString();
+ }
+
+ private CacheSerializableRunnable invokeStartLocator(final String
serverHostName, final int port,
+ final String testName, final String locatorsString, final Properties
props) {
+ return new CacheSerializableRunnable("execute: startLocator") {
+ public void run2() {
+ try {
+ startLocator(serverHostName, port, testName, locatorsString,
props);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeStartLocator(final String
serverHostName, final int port,
+ final Properties props) {
+ return new CacheSerializableRunnable("execute: startLocator") {
+ public void run2() {
+ try {
+ startLocator(serverHostName, port, props);
+ } catch (Exception e) {
+ fail("Error starting locators", e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeCreateCache(final Properties
systemProperties) {
+ return new CacheSerializableRunnable("execute: createCache") {
+ public void run2() {
+ try {
+ LuceneSearchWithRollingUpgradeDUnit.cache =
createCache(systemProperties);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeAssertVersion(final short
version) {
+ return new CacheSerializableRunnable("execute: assertVersion") {
+ public void run2() {
+ try {
+ assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache,
version);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeCreateRegion(final String
regionName,
+ final String shortcutName) {
+ return new CacheSerializableRunnable("execute: createRegion") {
+ public void run2() {
+ try {
+ createRegion(LuceneSearchWithRollingUpgradeDUnit.cache,
regionName, shortcutName);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable
invokeCreatePersistentPartitionedRegion(final String regionName,
+ final File diskstore) {
+ return new CacheSerializableRunnable("execute:
createPersistentPartitonedRegion") {
+ public void run2() {
+ try {
+
createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache,
regionName,
+ diskstore);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokePut(final String regionName,
final Object key,
+ final Object value) {
+ return new CacheSerializableRunnable("execute: put") {
+ public void run2() {
+ try {
+ put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key,
value);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeStopLocator() {
+ return new CacheSerializableRunnable("execute: stopLocator") {
+ public void run2() {
+ try {
+ stopLocator();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeCloseCache() {
+ return new CacheSerializableRunnable("execute: closeCache") {
+ public void run2() {
+ try {
+ closeCache(LuceneSearchWithRollingUpgradeDUnit.cache);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeRebalance() {
+ return new CacheSerializableRunnable("execute: rebalance") {
+ public void run2() {
+ try {
+ rebalance(LuceneSearchWithRollingUpgradeDUnit.cache);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ public void deleteDiskStores() throws Exception {
+ try {
+ FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
+ } catch (IOException e) {
+ throw new Error("Error deleting files", e);
+ }
+ }
+
+ public static Object createCache(Properties systemProperties) throws
Exception {
+
+ Class distConfigClass = Thread.currentThread().getContextClassLoader()
+
.loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
+ boolean disableConfig = true;
+ try {
+ distConfigClass.getDeclaredField("useSharedConfiguration");
+ } catch (NoSuchFieldException e) {
+ disableConfig = false;
+ }
+ if (disableConfig) {
+
systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME,
"false");
+ }
+
+ Class cacheFactoryClass =
Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.CacheFactory");
+ Constructor constructor =
cacheFactoryClass.getConstructor(Properties.class);
+ constructor.setAccessible(true);
+ Object cacheFactory = constructor.newInstance(systemProperties);
+
+ Method createMethod = cacheFactoryClass.getMethod("create");
+ createMethod.setAccessible(true);
+ Object cache = createMethod.invoke(cacheFactory);
+ return cache;
+ }
+
+ public static Object getRegion(Object cache, String regionName) throws
Exception {
+ return cache.getClass().getMethod("getRegion",
String.class).invoke(cache, regionName);
+ }
+
+ public static Object put(Object cache, String regionName, Object key,
Object value)
+ throws Exception {
+ Object region = getRegion(cache, regionName);
+ return region.getClass().getMethod("put", Object.class,
Object.class).invoke(region, key,
+ value);
+ }
+
+ public static void createRegion(Object cache, String regionName, String
shortcutName)
+ throws Exception {
+ Class aClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.RegionShortcut");
+ Object[] enumConstants = aClass.getEnumConstants();
+ Object shortcut = null;
+ int length = enumConstants.length;
+ for (int i = 0; i < length; i++) {
+ Object constant = enumConstants[i];
+ if (((Enum) constant).name().equals(shortcutName)) {
+ shortcut = constant;
+ break;
+ }
+ }
+
+ Method createRegionFactoryMethod =
cache.getClass().getMethod("createRegionFactory", aClass);
+ createRegionFactoryMethod.setAccessible(true);
+ Object regionFactory = createRegionFactoryMethod.invoke(cache,
shortcut);
+ Method createMethod = regionFactory.getClass().getMethod("create",
String.class);
+ createMethod.setAccessible(true);
+ createMethod.invoke(regionFactory, regionName);
+ }
+
+ public static void createLuceneIndex(Object cache, String regionName,
String indexName)
+ throws Exception {
+ Class luceneServiceProvider =
Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+ Method getLuceneService = luceneServiceProvider.getMethod("get",
GemFireCache.class);
+ Object luceneService = getLuceneService.invoke(luceneServiceProvider,
cache);
+ Method createLuceneIndexFactoryMethod =
+ luceneService.getClass().getMethod("createIndexFactory");
+ createLuceneIndexFactoryMethod.setAccessible(true);
+ Object luceneIndexFactory =
createLuceneIndexFactoryMethod.invoke(luceneService);
+ luceneIndexFactory.getClass().getMethod("addField",
String.class).invoke(luceneIndexFactory,
+ "status");
+ luceneIndexFactory.getClass().getMethod("create", String.class,
String.class)
+ .invoke(luceneIndexFactory, indexName, regionName);
+ }
+
+ public static void createPersistentPartitonedRegion(Object cache, String
regionName,
+ File diskStore) throws Exception {
+ Object store = cache.getClass().getMethod("findDiskStore",
String.class).invoke(cache, "store");
+ Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.DataPolicy");
+ Object dataPolicy =
dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
+ if (store == null) {
+ Object dsf =
cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
+ dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf,
1L);
+ dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
+ new Object[] {new File[] {diskStore.getAbsoluteFile()}});
+ dsf.getClass().getMethod("create", String.class).invoke(dsf,
"store");
+ }
+ Object rf =
cache.getClass().getMethod("createRegionFactory").invoke(cache);
+ rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf,
"store");
+ rf.getClass().getMethod("setDataPolicy",
dataPolicy.getClass()).invoke(rf, dataPolicy);
+ rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
+ }
+
+ public static void assertVersion(Object cache, short ordinal) throws
Exception {
+ Class idmClass = Thread.currentThread().getContextClassLoader()
+
.loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
+ Method getDSMethod =
cache.getClass().getMethod("getDistributedSystem");
+ getDSMethod.setAccessible(true);
+ Object ds = getDSMethod.invoke(cache);
+
+ Method getDistributedMemberMethod =
ds.getClass().getMethod("getDistributedMember");
+ getDistributedMemberMethod.setAccessible(true);
+ Object member = getDistributedMemberMethod.invoke(ds);
+ Method getVersionObjectMethod =
member.getClass().getMethod("getVersionObject");
+ getVersionObjectMethod.setAccessible(true);
+ Object thisVersion = getVersionObjectMethod.invoke(member);
+ Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
+ getOrdinalMethod.setAccessible(true);
+ short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
+ if (ordinal != thisOrdinal) {
+ throw new Error(
+ "Version ordinal:" + thisOrdinal + " was not the expected
ordinal of:" + ordinal);
+ }
+ }
+
+ public static void stopCacheServers(Object cache) throws Exception {
+ Method getCacheServersMethod =
cache.getClass().getMethod("getCacheServers");
+ getCacheServersMethod.setAccessible(true);
+ List cacheServers = (List) getCacheServersMethod.invoke(cache);
+ Method stopMethod = null;
+ for (Object cs : cacheServers) {
+ if (stopMethod == null) {
+ stopMethod = cs.getClass().getMethod("stop");
+ }
+ stopMethod.setAccessible(true);
+ stopMethod.invoke(cs);
+ }
+ }
+
+ public static void closeCache(Object cache) throws Exception {
+ if (cache == null) {
+ return;
+ }
+ Method isClosedMethod = cache.getClass().getMethod("isClosed");
+ isClosedMethod.setAccessible(true);
+ boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
+ if (cache != null && !cacheClosed) {
+ stopCacheServers(cache);
+ Method method = cache.getClass().getMethod("close");
+ method.setAccessible(true);
+ method.invoke(cache);
+ long startTime = System.currentTimeMillis();
+ while (!cacheClosed && System.currentTimeMillis() - startTime <
30000) {
+ try {
+ Thread.sleep(1000);
+ Method cacheClosedMethod =
cache.getClass().getMethod("isClosed");
+ cacheClosedMethod.setAccessible(true);
+ cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ public static void rebalance(Object cache) throws Exception {
+ Method getRMMethod = cache.getClass().getMethod("getResourceManager");
+ getRMMethod.setAccessible(true);
+ Object manager = getRMMethod.invoke(cache);
+
+ Method createRebalanceFactoryMethod =
manager.getClass().getMethod("createRebalanceFactory");
+ createRebalanceFactoryMethod.setAccessible(true);
+ Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
+ Method m = rebalanceFactory.getClass().getMethod("start");
+ m.setAccessible(true);
+ Object op = m.invoke(rebalanceFactory);
+
+ // Wait until the rebalance is completex
+ try {
+ Method getResultsMethod = op.getClass().getMethod("getResults");
+ getResultsMethod.setAccessible(true);
+ Object results = getResultsMethod.invoke(op);
+ Method getTotalTimeMethod =
results.getClass().getMethod("getTotalTime");
+ getTotalTimeMethod.setAccessible(true);
+ System.out.println("Took " + getTotalTimeMethod.invoke(results) + "
milliseconds\n");
+ Method getTotalBucketsMethod =
results.getClass().getMethod("getTotalBucketTransferBytes");
+ getTotalBucketsMethod.setAccessible(true);
+ System.out.println("Transfered " +
getTotalBucketsMethod.invoke(results) + "bytes\n");
+ } catch (Exception e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+
+ /**
+ * Starts a locator with given configuration.
+ *
+ * @param props TODO
--- End diff --
remove todo?
> Add lucene backward compatibility and rolling upgrade dunit tests
> ------------------------------------------------------------------
>
> Key: GEODE-3308
> URL: https://issues.apache.org/jira/browse/GEODE-3308
> Project: Geode
> Issue Type: Test
> Components: lucene
> Reporter: nabarun
> Assignee: nabarun
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)