Repository: incubator-geode Updated Branches: refs/heads/feature/e2e-testing a2ce01d33 -> 75521268a
GEODE-1906: fix misspelling of Successfully fix misspelling of Successfully, organize imports, reformat and cleanup classes Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/331fc171 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/331fc171 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/331fc171 Branch: refs/heads/feature/e2e-testing Commit: 331fc1717f725440c3e23192eeffc0fac09fb1b9 Parents: 85e97c3 Author: Kirk Lund <[email protected]> Authored: Fri Sep 16 11:58:38 2016 -0700 Committer: Kirk Lund <[email protected]> Committed: Thu Sep 22 10:49:49 2016 -0700 ---------------------------------------------------------------------- .../internal/beans/QueryDataFunction.java | 268 ++++++++---------- ...onedRegionHAFailureAndRecoveryDUnitTest.java | 282 +++++++------------ 2 files changed, 214 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/331fc171/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java index b174ddb..bcfbf43 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java @@ -70,41 +70,51 @@ import org.apache.geode.management.internal.cli.json.TypedJson; /** * This function is executed on one or multiple members based on the member * input to DistributedSystemMXBean.queryData() - * - * */ +@SuppressWarnings({ "deprecation", "unchecked" }) public class QueryDataFunction extends FunctionAdapter implements InternalEntity { + private static final long serialVersionUID = 1L; + private static final Logger logger = LogService.getLogger(); - + + private static final String MEMBER_KEY = "member"; + private static final String RESULT_KEY = "result"; + private static final String NO_DATA_FOUND = "No Data Found"; + private static final String QUERY_EXEC_SUCCESS = "Query Executed Successfully"; + private static final int DISPLAY_MEMBERWISE = 0; + private static final int QUERY = 1; + private static final int REGION = 2; + private static final int LIMIT = 3; + private static final int QUERY_RESULTSET_LIMIT = 4; + private static final int QUERY_COLLECTIONS_DEPTH = 5; + private static final String SELECT_EXPR = "\\s*SELECT\\s+.+\\s+FROM\\s+.+"; + private static final Pattern SELECT_EXPR_PATTERN = Pattern.compile(SELECT_EXPR, Pattern.CASE_INSENSITIVE); + private static final String SELECT_WITH_LIMIT_EXPR = "\\s*SELECT\\s+.+\\s+FROM(\\s+|(.*\\s+))LIMIT\\s+[0-9]+.*"; + private static final Pattern SELECT_WITH_LIMIT_EXPR_PATTERN = Pattern.compile(SELECT_WITH_LIMIT_EXPR, Pattern.CASE_INSENSITIVE); + @Override public boolean hasResult() { return true; } - private static final long serialVersionUID = 1L; - @Override - public void execute(FunctionContext context) { + public void execute(final FunctionContext context) { Object[] functionArgs = (Object[]) context.getArguments(); boolean showMember = (Boolean) functionArgs[DISPLAY_MEMBERWISE]; String queryString = (String) functionArgs[QUERY]; String regionName = (String) functionArgs[REGION]; int limit = (Integer) functionArgs[LIMIT]; - + int queryResultSetLimit = (Integer) functionArgs[QUERY_RESULTSET_LIMIT]; - + int queryCollectionsDepth = (Integer) functionArgs[QUERY_COLLECTIONS_DEPTH]; - - + try { - context.getResultSender().lastResult( - selectWithType(context, queryString, showMember, regionName, limit, queryResultSetLimit, - queryCollectionsDepth)); + context.getResultSender().lastResult(selectWithType(context, queryString, showMember, regionName, limit, queryResultSetLimit, queryCollectionsDepth)); } catch (Exception e) { context.getResultSender().sendException(e); } - } @Override @@ -112,30 +122,28 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity return ManagementConstants.QUERY_DATA_FUNCTION; } - @SuppressWarnings( { "unchecked" }) - public QueryDataFunctionResult selectWithType(FunctionContext context, String queryString, boolean showMember, - String regionName, int limit, int queryResultSetLimit, int queryCollectionsDepth) throws Exception { - + private QueryDataFunctionResult selectWithType(final FunctionContext context, + String queryString, + final boolean showMember, + final String regionName, + final int limit, + final int queryResultSetLimit, + final int queryCollectionsDepth) throws Exception { Cache cache = CacheFactory.getAnyInstance(); - - Function loclQueryFunc = new LocalQueryFunction("LocalQueryFunction", regionName, showMember) - .setOptimizeForWrite(true); - + Function loclQueryFunc = new LocalQueryFunction("LocalQueryFunction", regionName, showMember).setOptimizeForWrite(true); queryString = applyLimitClause(queryString, limit, queryResultSetLimit); try { - TypedJson result = new TypedJson(queryCollectionsDepth); Region region = cache.getRegion(regionName); if (region == null) { - throw new Exception(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND_ON_MEMBER.toLocalizedString(regionName, - cache.getDistributedSystem().getDistributedMember().getId())); + throw new Exception(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND_ON_MEMBER.toLocalizedString(regionName, cache.getDistributedSystem().getDistributedMember().getId())); } Object results = null; - + boolean noDataFound = true; if (region.getAttributes().getDataPolicy() == DataPolicy.NORMAL) { @@ -146,50 +154,46 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } else { ResultCollector rcollector = null; - + PartitionedRegion parRegion = PartitionedRegionHelper.getPartitionedRegion(regionName, cache); - if(parRegion != null && showMember){ - if(parRegion.isDataStore()){ - + if (parRegion != null && showMember) { + if (parRegion.isDataStore()) { + Set<BucketRegion> localPrimaryBucketRegions = parRegion.getDataStore().getAllLocalPrimaryBucketRegions(); - Set<Integer> localPrimaryBucketSet = new HashSet<Integer>(); + Set<Integer> localPrimaryBucketSet = new HashSet<>(); for (BucketRegion bRegion : localPrimaryBucketRegions) { localPrimaryBucketSet.add(bRegion.getId()); } LocalDataSet lds = new LocalDataSet(parRegion, localPrimaryBucketSet); - DefaultQuery query = (DefaultQuery)cache.getQueryService().newQuery( - queryString); - SelectResults selectResults = (SelectResults)lds.executeQuery(query, null, localPrimaryBucketSet); + DefaultQuery query = (DefaultQuery) cache.getQueryService().newQuery(queryString); + SelectResults selectResults = (SelectResults) lds.executeQuery(query, null, localPrimaryBucketSet); results = selectResults; } - }else{ - rcollector = FunctionService.onRegion(cache.getRegion(regionName)).withArgs(queryString) - .execute(loclQueryFunc); - results = (ArrayList) rcollector.getResult(); + } else { + rcollector = FunctionService.onRegion(cache.getRegion(regionName)).withArgs(queryString).execute(loclQueryFunc); + results = rcollector.getResult(); } - - } if (results != null && results instanceof SelectResults) { SelectResults selectResults = (SelectResults) results; - for (Iterator iter = selectResults.iterator(); iter.hasNext();) { + for (Iterator iter = selectResults.iterator(); iter.hasNext(); ) { Object object = iter.next(); - result.add(RESULT_KEY,object); + result.add(RESULT_KEY, object); noDataFound = false; } } else if (results != null && results instanceof ArrayList) { ArrayList listResults = (ArrayList) results; - ArrayList actualResult = (ArrayList)listResults.get(0); + ArrayList actualResult = (ArrayList) listResults.get(0); for (Object object : actualResult) { result.add(RESULT_KEY, object); noDataFound = false; } - } - + } + if (!noDataFound && showMember) { - result.add(MEMBER_KEY,cache.getDistributedSystem().getDistributedMember().getId()); + result.add(MEMBER_KEY, cache.getDistributedSystem().getDistributedMember().getId()); } if (noDataFound) { @@ -199,27 +203,22 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } catch (Exception e) { logger.warn(e.getMessage(), e); throw e; - } finally { - } - } - /** * Matches the input query with query with limit pattern. If limit is found in * input query this function ignores. Else it will append a default limit .. * 1000 If input limit is 0 then also it will append default limit of 1000 - * - * @param query - * input query - * @param limit - * limit on the result set + * + * @param query input query + * @param limit limit on the result set + * * @return a string having limit clause */ - static String applyLimitClause(String query, int limit, int queryResultSetLimit) { - - Matcher matcher = SELECT_EXPR_PATTERN.matcher(query); + private static String applyLimitClause(final String query, int limit, final int queryResultSetLimit) { + + Matcher matcher = SELECT_EXPR_PATTERN.matcher(query); if (matcher.matches()) { Matcher limit_matcher = SELECT_WITH_LIMIT_EXPR_PATTERN.matcher(query); @@ -237,14 +236,14 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity return query; } - @SuppressWarnings( { "unchecked" }) - static Object callFunction(Object functionArgs, Set<DistributedMember> members, boolean zipResult) throws Exception { + private static Object callFunction(final Object functionArgs, final Set<DistributedMember> members, final boolean zipResult) throws Exception { try { if (members.size() == 1) { DistributedMember member = members.iterator().next(); - ResultCollector collector = FunctionService.onMember(member).withArgs(functionArgs).execute( - ManagementConstants.QUERY_DATA_FUNCTION); + ResultCollector collector = FunctionService.onMember(member) + .withArgs(functionArgs) + .execute(ManagementConstants.QUERY_DATA_FUNCTION); List list = (List) collector.getResult(); Object object = null; if (list.size() > 0) { @@ -252,8 +251,7 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } if (object instanceof Throwable) { - Throwable error = (Throwable) object; - throw error; + throw (Throwable) object; } QueryDataFunctionResult result = (QueryDataFunctionResult) object; @@ -263,7 +261,7 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity Object[] functionArgsList = (Object[]) functionArgs; boolean showMember = (Boolean) functionArgsList[DISPLAY_MEMBERWISE]; if (showMember) {// Added to show a single member similar to multiple - // member. + // member. // Note , if no member is selected this is the code path executed. A // random associated member is chosen. List<String> decompressedList = new ArrayList<String>(); @@ -274,14 +272,14 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } } else { // More than 1 Member - ResultCollector coll = FunctionService.onMembers(members).withArgs(functionArgs).execute( - ManagementConstants.QUERY_DATA_FUNCTION); + ResultCollector coll = FunctionService.onMembers(members) + .withArgs(functionArgs) + .execute(ManagementConstants.QUERY_DATA_FUNCTION); List list = (List) coll.getResult(); Object object = list.get(0); if (object instanceof Throwable) { - Throwable error = (Throwable) object; - throw error; + throw (Throwable) object; } Iterator<QueryDataFunctionResult> it = list.iterator(); @@ -316,7 +314,7 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } } - static String wrapResult(String str) { + private static String wrapResult(final String str) { StringWriter w = new StringWriter(); synchronized (w.getBuffer()) { w.write("{\"result\":"); @@ -325,10 +323,13 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity return w.toString(); } } - - - public static Object queryData(String query, String members, int limit, boolean zipResult, int queryResultSetLimit, int queryCollectionsDepth) throws Exception { + public static Object queryData(final String query, + final String members, + final int limit, + final boolean zipResult, + final int queryResultSetLimit, + final int queryCollectionsDepth) throws Exception { if (query == null || query.isEmpty()) { return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__QUERY_EMPTY.toLocalizedString()).toString(); @@ -353,29 +354,27 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity SystemManagementService service = (SystemManagementService) ManagementService.getExistingManagementService(cache); Set<String> regionsInQuery = compileQuery(cache, query); - - // Validate region existance + + // Validate region existence if (regionsInQuery.size() > 0) { for (String regionPath : regionsInQuery) { DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath); if (regionMBean == null) { return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND.toLocalizedString(regionPath)).toString(); } else { - Set<DistributedMember> associatedMembers = DataCommands - .getRegionAssociatedMembers(regionPath, cache, true); + Set<DistributedMember> associatedMembers = DataCommands.getRegionAssociatedMembers(regionPath, cache, true); if (inputMembers != null && inputMembers.size() > 0) { if (!associatedMembers.containsAll(inputMembers)) { - return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND_ON_MEMBERS - .toLocalizedString(regionPath)).toString(); + return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND_ON_MEMBERS.toLocalizedString(regionPath)) + .toString(); } - } + } } } } else { - return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__INVALID_QUERY - .toLocalizedString("Region mentioned in query probably missing /")).toString(); + return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__INVALID_QUERY.toLocalizedString("Region mentioned in query probably missing /")).toString(); } // Validate @@ -383,19 +382,16 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity for (String regionPath : regionsInQuery) { DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath); - if (regionMBean.getRegionType().equals(DataPolicy.PARTITION.toString()) - || regionMBean.getRegionType().equals(DataPolicy.PERSISTENT_PARTITION.toString())) { + if (regionMBean.getRegionType().equals(DataPolicy.PARTITION.toString()) || + regionMBean.getRegionType().equals(DataPolicy.PERSISTENT_PARTITION.toString())) { return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__JOIN_OP_EX.toLocalizedString()).toString(); } } } - String randomRegion = regionsInQuery.iterator().next(); - - Set<DistributedMember> associatedMembers = DataCommands.getQueryRegionsAssociatedMembers(regionsInQuery, cache, - false);// First available member + Set<DistributedMember> associatedMembers = DataCommands.getQueryRegionsAssociatedMembers(regionsInQuery, cache, false);// First available member if (associatedMembers != null && associatedMembers.size() > 0) { Object[] functionArgs = new Object[6]; @@ -408,7 +404,7 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity functionArgs[LIMIT] = limit; functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit; functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth; - Object result = QueryDataFunction.callFunction(functionArgs, inputMembers, zipResult); + Object result = callFunction(functionArgs, inputMembers, zipResult); return result; } else { // Query on any random member functionArgs[DISPLAY_MEMBERWISE] = false; @@ -417,28 +413,26 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity functionArgs[LIMIT] = limit; functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit; functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth; - Object result = QueryDataFunction.callFunction(functionArgs, associatedMembers, zipResult); + Object result = callFunction(functionArgs, associatedMembers, zipResult); return result; } } else { - return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND.toLocalizedString(regionsInQuery - .toString())).toString(); + return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__REGIONS_NOT_FOUND.toLocalizedString(regionsInQuery.toString())).toString(); } } catch (QueryInvalidException qe) { return new JsonisedErroMessage(ManagementStrings.QUERY__MSG__INVALID_QUERY.toLocalizedString(qe.getMessage())).toString(); - } + } } - - + private static class JsonisedErroMessage { private static String message = "message"; private GfJsonObject gFJsonObject = new GfJsonObject(); - public JsonisedErroMessage(String errorMessage) throws Exception { + public JsonisedErroMessage(final String errorMessage) throws Exception { try { gFJsonObject.put(message, errorMessage); } catch (GfJsonException e) { @@ -446,25 +440,25 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } } + @Override public String toString() { return gFJsonObject.toString(); } - + } /** * Compile the query and return a set of regions involved in the query It * throws an QueryInvalidException if the query is not proper - * - * @param cache - * current cache - * @param query - * input query + * + * @param cache current cache + * @param query input query + * * @return a set of regions involved in the query + * * @throws QueryInvalidException */ - @SuppressWarnings("deprecation") - public static Set<String> compileQuery(Cache cache, String query) throws QueryInvalidException { + private static Set<String> compileQuery(final Cache cache, final String query) throws QueryInvalidException { QCompiler compiler = new QCompiler(); Set<String> regionsInQuery = null; try { @@ -482,19 +476,24 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity /** * Function to gather data locally. This function is required to execute query * with region context - * - * */ private class LocalQueryFunction extends FunctionAdapter { private static final long serialVersionUID = 1L; - private boolean optimizeForWrite = false; + private final String id; + private boolean optimizeForWrite = false; private boolean showMembers = false; - private String regionName; + public LocalQueryFunction(final String id, final String regionName, final boolean showMembers) { + super(); + this.id = id; + this.regionName = regionName; + this.showMembers = showMembers; + } + @Override public boolean hasResult() { return true; @@ -505,29 +504,18 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity return false; } - private final String id; - @Override public boolean optimizeForWrite() { return optimizeForWrite; } - public LocalQueryFunction setOptimizeForWrite(boolean optimizeForWrite) { + public LocalQueryFunction setOptimizeForWrite(final boolean optimizeForWrite) { this.optimizeForWrite = optimizeForWrite; return this; } - public LocalQueryFunction(String id, String regionName, boolean showMembers) { - super(); - this.id = id; - this.regionName = regionName; - this.showMembers = showMembers; - - } - - @SuppressWarnings("unchecked") @Override - public void execute(FunctionContext context) { + public void execute(final FunctionContext context) { Cache cache = CacheFactory.getAnyInstance(); QueryService queryService = cache.getQueryService(); String qstr = (String) context.getArguments(); @@ -554,56 +542,22 @@ public class QueryDataFunction extends FunctionAdapter implements InternalEntity } } - private static String MEMBER_KEY = "member"; - - private static String RESULT_KEY = "result"; - - private static String NO_DATA_FOUND = "No Data Found"; - - private static String QUERY_EXEC_SUCCESS = "Query Executed Successfuly"; - - private static int DISPLAY_MEMBERWISE = 0; - - private static int QUERY = 1; - - private static int REGION = 2; - - private static int LIMIT = 3; - - private static int QUERY_RESULTSET_LIMIT = 4; - - private static int QUERY_COLLECTIONS_DEPTH = 5; - - static final String SELECT_EXPR = "\\s*SELECT\\s+.+\\s+FROM\\s+.+"; - - static Pattern SELECT_EXPR_PATTERN = Pattern.compile(SELECT_EXPR, Pattern.CASE_INSENSITIVE); - - static final String SELECT_WITH_LIMIT_EXPR = "\\s*SELECT\\s+.+\\s+FROM(\\s+|(.*\\s+))LIMIT\\s+[0-9]+.*"; - - static Pattern SELECT_WITH_LIMIT_EXPR_PATTERN = Pattern.compile(SELECT_WITH_LIMIT_EXPR, Pattern.CASE_INSENSITIVE); - + private static class QueryDataFunctionResult implements Serializable { - public static class QueryDataFunctionResult implements Serializable { private static final long serialVersionUID = 1L; private final String message; private final byte[] compressedBytes; - public QueryDataFunctionResult(String message, byte[] compressedBytes) { + public QueryDataFunctionResult(final String message, final byte[] compressedBytes) { this.message = message; this.compressedBytes = compressedBytes; } - /** - * @return the message - */ public String getMessage() { return message; } - /** - * @return the compressedBytes - */ public byte[] getCompressedBytes() { return compressedBytes; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/331fc171/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionHAFailureAndRecoveryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionHAFailureAndRecoveryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionHAFailureAndRecoveryDUnitTest.java index 4e86b12..1f9d3bde 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionHAFailureAndRecoveryDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionHAFailureAndRecoveryDUnitTest.java @@ -16,21 +16,17 @@ */ package org.apache.geode.internal.cache; -import org.junit.experimental.categories.Category; -import org.junit.Test; - import static org.junit.Assert.*; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; - import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.experimental.categories.Category; + import org.apache.geode.CancelException; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; @@ -50,6 +46,7 @@ import org.apache.geode.test.dunit.SerializableCallable; import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; /** * This is a Dunit test for PartitionedRegion cleanup on Node Failure through @@ -58,25 +55,14 @@ import org.apache.geode.test.dunit.VM; * metadata cleanup for single failed node.</br> (2) * testMetaDataCleanupOnMultiplePRNodeFail - Test for PartitionedRegion metadata * cleanup for multiple failed nodes.</br> - * */ @Category(DistributedTest.class) -public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends - PartitionedRegionDUnitTestCase -{ - - /** to store references of 4 vms */ - VM vmArr[] = new VM[4]; +public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends PartitionedRegionDUnitTestCase { /** - * Constructor for PartitionedRegionHAFailureAndRecoveryDUnitTest. - * - * @param name + * to store references of 4 vms */ - public PartitionedRegionHAFailureAndRecoveryDUnitTest() { - - super(); - } + private VM vmArr[] = new VM[4]; /** * Test for PartitionedRegion metadata cleanup for single node failure. <br> @@ -87,10 +73,8 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends * (4) Validate Failed node's config metadata </br> <br> * (5) Validate Failed node's bucket2Node Region metadata. </br> */ - @Test - public void testMetaDataCleanupOnSinglePRNodeFail() throws Throwable - { + public void testMetaDataCleanupOnSinglePRNodeFail() throws Exception { // create the VM's createVMs(); // create the partitionedRegion on diffrent nodes. @@ -98,97 +82,76 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends final int endIndexForRegion = 4; final int localMaxMemory = 200; final int redundancy = 1; - createPartitionRegionAsynch("testMetaDataCleanupOnSinglePRNodeFail_", - startIndexForRegion, endIndexForRegion, localMaxMemory, redundancy, -1); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnSinglePRNodeFail() - PartitionedRegion's created at all VM nodes"); - + createPartitionRegionAsynch("testMetaDataCleanupOnSinglePRNodeFail_", startIndexForRegion, endIndexForRegion, localMaxMemory, redundancy, -1); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnSinglePRNodeFail() - PartitionedRegion's created at all VM nodes"); + // Add a listener to the config meta data addConfigListeners(); // disconnect vm0. - DistributedMember dsMember = (DistributedMember)vmArr[0].invoke(this, "disconnectMethod"); + DistributedMember dsMember = (DistributedMember) vmArr[0].invoke(() -> disconnectMethod()); + + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnSinglePRNodeFail() - VM = " + dsMember + " disconnected from the distributed system "); - LogWriterUtils.getLogWriter().info( - "testMetaDataCleanupOnSinglePRNodeFail() - VM = " + dsMember - + " disconnected from the distributed system "); - // validate that the metadata clean up is done at all the VM's. vmArr[1].invoke(validateNodeFailMetaDataCleanUp(dsMember)); vmArr[2].invoke(validateNodeFailMetaDataCleanUp(dsMember)); vmArr[3].invoke(validateNodeFailMetaDataCleanUp(dsMember)); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnSinglePRNodeFail() - Validation of Failed node config metadata complete"); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnSinglePRNodeFail() - Validation of Failed node config metadata complete"); // validate that bucket2Node clean up is done at all the VM's. vmArr[1].invoke(validateNodeFailbucket2NodeCleanUp(dsMember)); vmArr[2].invoke(validateNodeFailbucket2NodeCleanUp(dsMember)); vmArr[3].invoke(validateNodeFailbucket2NodeCleanUp(dsMember)); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnSinglePRNodeFail() - Validation of Failed node bucket2Node Region metadata complete"); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnSinglePRNodeFail() - Validation of Failed node bucket2Node Region metadata complete"); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnSinglePRNodeFail() Completed Successfuly .........."); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnSinglePRNodeFail() Completed Successfully .........."); } - private void addConfigListeners() - { - + private void addConfigListeners() { + final SerializableRunnable addListener = new SerializableRunnable("add PRConfig listener") { private static final long serialVersionUID = 1L; - public void run() - { + + public void run() { Cache c = getCache(); Region rootReg = PartitionedRegionHelper.getPRRoot(c); -// Region allPRs = PartitionedRegionHelper.getPRConfigRegion(rootReg, c); rootReg.getAttributesMutator().addCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter())); } }; - + for (int count = 0; count < this.vmArr.length; count++) { VM vm = this.vmArr[count]; vm.invoke(addListener); } } - - private void clearConfigListenerState(VM[] vmsToClear) - { + + private void clearConfigListenerState(VM[] vmsToClear) { final SerializableRunnable clearListener = new SerializableRunnable("clear the listener state") { private static final long serialVersionUID = 1L; - public void run() - { + + public void run() { try { Cache c = getCache(); Region rootReg = PartitionedRegionHelper.getPRRoot(c); -// Region allPRs = PartitionedRegionHelper.getPRConfigRegion(rootReg, c); CacheListener[] cls = rootReg.getAttributes().getCacheListeners(); assertEquals(2, cls.length); CertifiableTestCacheListener ctcl = (CertifiableTestCacheListener) cls[1]; ctcl.clearState(); - } - catch (CancelException possible) { + } catch (CancelException possible) { // If a member has been disconnected, we may get a CancelException // in which case the config listener state has been cleared (in a big way) } } }; - + for (int count = 0; count < vmsToClear.length; count++) { VM vm = vmsToClear[count]; vm.invoke(clearListener); } } - static private final String WAIT_PROPERTY = - "PartitionedRegionHAFailureAndRecoveryDUnitTest.maxWaitTime"; - static private final int WAIT_DEFAULT = 10000; - - /** * Test for PartitionedRegion metadata cleanup for multiple node failure. <br> * <u>This test does the following:<u></br> <br> @@ -199,8 +162,7 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends * (5) Validate all Failed node's bucket2Node Region metadata. </br> */ @Test - public void testMetaDataCleanupOnMultiplePRNodeFail() throws Throwable - { + public void testMetaDataCleanupOnMultiplePRNodeFail() throws Exception { // create the VM's createVMs(); // create the partitionedRegion on diffrent nodes. @@ -208,53 +170,37 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends final int endIndexForRegion = 4; final int localMaxMemory = 200; final int redundancy = 1; - createPartitionRegionAsynch("testMetaDataCleanupOnMultiplePRNodeFail_", - startIndexForRegion, endIndexForRegion, localMaxMemory, redundancy, -1); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnMultiplePRNodeFail() - PartitionedRegion's created at all VM nodes"); - + createPartitionRegionAsynch("testMetaDataCleanupOnMultiplePRNodeFail_", startIndexForRegion, endIndexForRegion, localMaxMemory, redundancy, -1); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnMultiplePRNodeFail() - PartitionedRegion's created at all VM nodes"); + addConfigListeners(); // disconnect vm0 - DistributedMember dsMember = (DistributedMember)vmArr[0].invoke(this, "disconnectMethod"); + DistributedMember dsMember = (DistributedMember) vmArr[0].invoke(() -> disconnectMethod()); - LogWriterUtils.getLogWriter().info( - "testMetaDataCleanupOnMultiplePRNodeFail() - VM = " + dsMember - + " disconnected from the distributed system "); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnMultiplePRNodeFail() - VM = " + dsMember + " disconnected from the distributed system "); // validate that the metadata clean up is done at all the VM's for first // failed node. vmArr[1].invoke(validateNodeFailMetaDataCleanUp(dsMember)); vmArr[2].invoke(validateNodeFailMetaDataCleanUp(dsMember)); vmArr[3].invoke(validateNodeFailMetaDataCleanUp(dsMember)); - + // validate that bucket2Node clean up is done at all the VM's for all failed // nodes. vmArr[1].invoke(validateNodeFailbucket2NodeCleanUp(dsMember)); vmArr[2].invoke(validateNodeFailbucket2NodeCleanUp(dsMember)); vmArr[3].invoke(validateNodeFailbucket2NodeCleanUp(dsMember)); - + // Clear state of listener, skipping the vmArr[0] which was disconnected - VM[] vmsToClear = new VM[] {vmArr[1], vmArr[2], vmArr[3]}; + VM[] vmsToClear = new VM[] { vmArr[1], vmArr[2], vmArr[3] }; clearConfigListenerState(vmsToClear); // disconnect vm1 - DistributedMember dsMember2 = (DistributedMember)vmArr[1].invoke(this, "disconnectMethod"); - - LogWriterUtils.getLogWriter().info( - "testMetaDataCleanupOnMultiplePRNodeFail() - VM = " + dsMember2 - + " disconnected from the distributed system "); - -// Thread.sleep(5000); -// final int maxWaitTime = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT).intValue(); -// try { -// Thread.sleep(maxWaitTime); -// } -// catch (InterruptedException e) { -// fail("interrupted"); -// } - + DistributedMember dsMember2 = (DistributedMember) vmArr[1].invoke(() -> disconnectMethod()); + + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnMultiplePRNodeFail() - VM = " + dsMember2 + " disconnected from the distributed system "); + // validate that the metadata clean up is done at all the VM's for first // failed node. vmArr[2].invoke(validateNodeFailMetaDataCleanUp(dsMember)); @@ -265,87 +211,70 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends vmArr[2].invoke(validateNodeFailMetaDataCleanUp(dsMember2)); vmArr[3].invoke(validateNodeFailMetaDataCleanUp(dsMember2)); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnMultiplePRNodeFail() - Validation of Failed nodes config metadata complete"); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnMultiplePRNodeFail() - Validation of Failed nodes config metadata complete"); vmArr[2].invoke(validateNodeFailbucket2NodeCleanUp(dsMember2)); vmArr[3].invoke(validateNodeFailbucket2NodeCleanUp(dsMember2)); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnMultiplePRNodeFail() - Validation of Failed nodes bucket2Node Region metadata complete"); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnMultiplePRNodeFail() - Validation of Failed nodes bucket2Node Region metadata complete"); - LogWriterUtils.getLogWriter() - .info( - "testMetaDataCleanupOnMultiplePRNodeFail() Completed Successfuly .........."); + LogWriterUtils.getLogWriter().info("testMetaDataCleanupOnMultiplePRNodeFail() Completed Successfully .........."); } /** * Returns CacheSerializableRunnable to validate the Failed node config * metadata. * - * @param dsMember - * Failed DistributedMember + * @param dsMember Failed DistributedMember + * * @return CacheSerializableRunnable */ - - public CacheSerializableRunnable validateNodeFailMetaDataCleanUp( - final DistributedMember dsMember) - { - SerializableRunnable validator = new CacheSerializableRunnable( - "validateNodeFailMetaDataCleanUp") { - public void run2() throws CacheException - { + private CacheSerializableRunnable validateNodeFailMetaDataCleanUp(final DistributedMember dsMember) { + SerializableRunnable validator = new CacheSerializableRunnable("validateNodeFailMetaDataCleanUp") { + public void run2() throws CacheException { Cache cache = getCache(); Region rootReg = PartitionedRegionHelper.getPRRoot(cache); -// Region allPRs = PartitionedRegionHelper.getPRConfigRegion(rootReg, cache); CacheListener[] cls = rootReg.getAttributes().getCacheListeners(); assertEquals(2, cls.length); CertifiableTestCacheListener ctcl = (CertifiableTestCacheListener) cls[1]; - - LogWriterUtils.getLogWriter().info("Listener update (" + ctcl.updates.size() + "): " + ctcl.updates) ; - LogWriterUtils.getLogWriter().info("Listener destroy: (" + ctcl.destroys.size() + "): " + ctcl.destroys) ; + + LogWriterUtils.getLogWriter().info("Listener update (" + ctcl.updates.size() + "): " + ctcl.updates); + LogWriterUtils.getLogWriter().info("Listener destroy: (" + ctcl.destroys.size() + "): " + ctcl.destroys); Iterator itrator = rootReg.keySet().iterator(); - for (Iterator itr = itrator; itr.hasNext();) { - String prName = (String)itr.next(); + for (Iterator itr = itrator; itr.hasNext(); ) { + String prName = (String) itr.next(); ctcl.waitForUpdated(prName); Object obj = rootReg.get(prName); if (obj != null) { - PartitionRegionConfig prConf = (PartitionRegionConfig)obj; + PartitionRegionConfig prConf = (PartitionRegionConfig) obj; Set<Node> nodeList = prConf.getNodes(); Iterator itr2 = nodeList.iterator(); while (itr2.hasNext()) { - DistributedMember member = ((Node)itr2.next()).getMemberId(); + DistributedMember member = ((Node) itr2.next()).getMemberId(); if (member.equals(dsMember)) { - fail("Failed DistributedMember's = " + member - + " global meta data not cleared. For PR Region = " - + prName); + fail("Failed DistributedMember's = " + member + " global meta data not cleared. For PR Region = " + prName); } } } } } }; - return (CacheSerializableRunnable)validator; + return (CacheSerializableRunnable) validator; } /** - * Returns CacheSerializableRunnable to validate the Failed node bucket2Node metadata. + * Returns CacheSerializableRunnable to validate the Failed node bucket2Node metadata. + * * @param dsMember Failed DistributedMember + * * @return CacheSerializableRunnable */ + private CacheSerializableRunnable validateNodeFailbucket2NodeCleanUp(final DistributedMember dsMember) { + SerializableRunnable createPRs = new CacheSerializableRunnable("validateNodeFailbucket2NodeCleanUp") { - public CacheSerializableRunnable validateNodeFailbucket2NodeCleanUp( - final DistributedMember dsMember) - { - SerializableRunnable createPRs = new CacheSerializableRunnable( - "validateNodeFailbucket2NodeCleanUp") { - - public void run2() throws CacheException - { + public void run2() throws CacheException { getCache(); Map prIDmap = PartitionedRegion.prIdToPR; Iterator itr = prIDmap.values().iterator(); @@ -357,76 +286,72 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends PartitionedRegion prRegion = (PartitionedRegion) o; Iterator bukI = prRegion.getRegionAdvisor().getBucketSet().iterator(); - while(bukI.hasNext()) { + while (bukI.hasNext()) { Integer bucketId = (Integer) bukI.next(); Set bucketOwners = prRegion.getRegionAdvisor().getBucketOwners(bucketId.intValue()); if (bucketOwners.contains(dsMember)) { - fail("Failed DistributedMember's = " + dsMember - + " bucket [" + prRegion.bucketStringForLogs(bucketId.intValue()) - + "] meta-data not cleared for partitioned region " + prRegion); + fail("Failed DistributedMember's = " + dsMember + " bucket [" + prRegion.bucketStringForLogs(bucketId.intValue()) + "] meta-data not cleared for partitioned region " + prRegion); } } } } }; - return (CacheSerializableRunnable)createPRs; + return (CacheSerializableRunnable) createPRs; } - /** - * Function to create 4 Vms on a given host. + * Function to create 4 Vms on a given host. */ - private void createVMs() - { + private void createVMs() { Host host = Host.getHost(0); for (int i = 0; i < 4; i++) { vmArr[i] = host.getVM(i); } } + /** * Function for disconnecting a member from distributed system. + * * @return Disconnected DistributedMember */ - public DistributedMember disconnectMethod() - { - DistributedMember dsMember = ((InternalDistributedSystem)getCache() - .getDistributedSystem()).getDistributionManager().getId(); + private DistributedMember disconnectMethod() { + DistributedMember dsMember = ((InternalDistributedSystem) getCache().getDistributedSystem()).getDistributionManager() + .getId(); getCache().getDistributedSystem().disconnect(); LogWriterUtils.getLogWriter().info("disconnectMethod() completed .."); return dsMember; } - + /** * This function creates multiple partition regions on specified nodes. */ private void createPartitionRegionAsynch(final String regionPrefix, - final int startIndexForRegion, final int endIndexForRegion, - final int localMaxMemory, final int redundancy, final int recoveryDelay) throws Throwable - { + final int startIndexForRegion, + final int endIndexForRegion, + final int localMaxMemory, + final int redundancy, + final int recoveryDelay) throws Exception { final AsyncInvocation[] async = new AsyncInvocation[vmArr.length]; for (int count = 0; count < vmArr.length; count++) { VM vm = vmArr[count]; - async[count] = vm.invokeAsync(getCreateMultiplePRregion(regionPrefix, endIndexForRegion, - redundancy, localMaxMemory, recoveryDelay)); + async[count] = vm.invokeAsync(getCreateMultiplePRregion(regionPrefix, endIndexForRegion, redundancy, localMaxMemory, recoveryDelay)); } for (int count2 = 0; count2 < async.length; count2++) { - ThreadUtils.join(async[count2], 30 * 1000); - } - + ThreadUtils.join(async[count2], 30 * 1000); + } + for (int count2 = 0; count2 < async.length; count2++) { if (async[count2].exceptionOccurred()) { Assert.fail("exception during " + count2, async[count2].getException()); } - } + } } - + /** - * Test for peer recovery of buckets when a member is removed from the distributed system - * @throws Throwable + * Test for peer recovery of buckets when a member is removed from the distributed system */ @Test - public void testRecoveryOfSingleMemberFailure() throws Throwable - { + public void testRecoveryOfSingleMemberFailure() throws Exception { final String uniqName = getUniqueName(); // create the VM's createVMs(); @@ -434,15 +359,14 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends final int numRegions = 1; // Create PR on man VMs - createPartitionRegionAsynch(uniqName, 0, numRegions, 20, redundantCopies, 0); + createPartitionRegionAsynch(uniqName, 0, numRegions, 20, redundantCopies, 0); // Create some buckets, pick one and get one of the members hosting it - final DistributedMember bucketHost = - (DistributedMember) this.vmArr[0].invoke(new SerializableCallable("Populate PR-" + getUniqueName()) { + final DistributedMember bucketHost = (DistributedMember) this.vmArr[0].invoke(new SerializableCallable("Populate PR-" + getUniqueName()) { public Object call() throws Exception { PartitionedRegion r = (PartitionedRegion) getCache().getRegion(uniqName + "0"); // Create some buckets - int i=0; + int i = 0; final int bucketTarget = 2; while (r.getRegionAdvisor().getBucketSet().size() < bucketTarget) { if (i > r.getTotalNumberOfBuckets()) { @@ -451,7 +375,7 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends Object k = new Integer(i++); r.put(k, k.toString()); } - + // Grab a bucket id Integer bucketId = r.getRegionAdvisor().getBucketSet().iterator().next(); assertNotNull(bucketId); @@ -461,12 +385,13 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends assertEquals(bucketOwners.size(), redundantCopies + 1); DistributedMember bucketOwner = (DistributedMember) bucketOwners.iterator().next(); assertNotNull(bucketOwner); - LogWriterUtils.getLogWriter().info("Selected distributed member " + bucketOwner + " to disconnect because it hosts bucketId " + bucketId); + LogWriterUtils.getLogWriter() + .info("Selected distributed member " + bucketOwner + " to disconnect because it hosts bucketId " + bucketId); return bucketOwner; } }); assertNotNull(bucketHost); - + // Disconnect the selected host Map stillHasDS = Invoke.invokeInEveryVM(new SerializableCallable("Disconnect provided bucketHost") { public Object call() throws Exception { @@ -478,24 +403,24 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends return Boolean.TRUE; } }); - + // Wait for each PR instance on each VM to finish recovery of redundancy // for the selected bucket final int MAX_SECONDS_TO_WAIT = 120; for (int count = 0; count < vmArr.length; count++) { VM vm = vmArr[count]; // only wait on the remaining VMs (prevent creating a new distributed system on the disconnected VM) - if (((Boolean)stillHasDS.get(vm)).booleanValue()) { + if (((Boolean) stillHasDS.get(vm)).booleanValue()) { vm.invoke(new SerializableRunnable("Wait for PR region recovery") { public void run() { - for (int i=0; i<numRegions; i++) { + for (int i = 0; i < numRegions; i++) { Region r = getCache().getRegion(uniqName + i); assertTrue(r instanceof PartitionedRegion); PartitionedRegion pr = (PartitionedRegion) r; PartitionedRegionStats prs = pr.getPrStats(); // Wait for recovery final long start = NanoTimer.getTime(); - for(;;) { + for (; ; ) { if (prs.getLowRedundancyBucketCount() == 0) { break; // buckets have been recovered from this VM's point of view } @@ -504,8 +429,7 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends } try { TimeUnit.MILLISECONDS.sleep(250); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { Assert.fail("Interrupted, ah!", e); } } @@ -514,16 +438,16 @@ public class PartitionedRegionHAFailureAndRecoveryDUnitTest extends }); } } // VM loop - + // Validate all buckets have proper redundancy for (int count = 0; count < vmArr.length; count++) { VM vm = vmArr[count]; // only validate buckets on remaining VMs // (prevent creating a new distributed system on the disconnected VM) - if (((Boolean)stillHasDS.get(vm)).booleanValue()) { + if (((Boolean) stillHasDS.get(vm)).booleanValue()) { vm.invoke(new SerializableRunnable("Validate all bucket redundancy") { public void run() { - for (int i=0; i<numRegions; i++) { // region loop + for (int i = 0; i < numRegions; i++) { // region loop PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(uniqName + i); Iterator bucketIdsWithStorage = pr.getRegionAdvisor().getBucketSet().iterator(); while (bucketIdsWithStorage.hasNext()) { // bucketId loop
