GEODE-1055 Remove unused/dead code from PartitionedRegionQueryEvaluator
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f486b700 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f486b700 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f486b700 Branch: refs/heads/feature/GEODE-949-2 Commit: f486b700c489dd933b98be4be398143ebed794c1 Parents: b8d4db2 Author: Jason Huynh <[email protected]> Authored: Thu Jan 21 16:35:15 2016 -0800 Committer: Jason Huynh <[email protected]> Committed: Thu Mar 10 11:55:22 2016 -0800 ---------------------------------------------------------------------- .../cache/PartitionedRegionQueryEvaluator.java | 391 +------------------ 1 file changed, 6 insertions(+), 385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f486b700/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java index db40af3..1d6cf0e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java @@ -26,7 +26,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -39,7 +38,6 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.CopyHelper; -import com.gemstone.gemfire.InternalGemFireError; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.query.QueryException; import com.gemstone.gemfire.cache.query.QueryExecutionLowMemoryException; @@ -47,12 +45,6 @@ import com.gemstone.gemfire.cache.query.QueryInvocationTargetException; import com.gemstone.gemfire.cache.query.SelectResults; import com.gemstone.gemfire.cache.query.Struct; import com.gemstone.gemfire.cache.query.internal.CompiledGroupBySelect; -import com.gemstone.gemfire.cache.query.internal.CompiledID; -import com.gemstone.gemfire.cache.query.internal.CompiledIndexOperation; -import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef; -import com.gemstone.gemfire.cache.query.internal.CompiledLiteral; -import com.gemstone.gemfire.cache.query.internal.CompiledOperation; -import com.gemstone.gemfire.cache.query.internal.CompiledPath; import com.gemstone.gemfire.cache.query.internal.CompiledSelect; import com.gemstone.gemfire.cache.query.internal.CompiledSortCriterion; import com.gemstone.gemfire.cache.query.internal.CompiledValue; @@ -60,23 +52,16 @@ import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults; import com.gemstone.gemfire.cache.query.internal.DefaultQuery; import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; import com.gemstone.gemfire.cache.query.internal.ExecutionContext; -import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults.Metadata; import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver.IndexInfo; import com.gemstone.gemfire.cache.query.internal.NWayMergeResults; import com.gemstone.gemfire.cache.query.internal.OrderByComparator; import com.gemstone.gemfire.cache.query.internal.PRQueryTraceInfo; import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext; import com.gemstone.gemfire.cache.query.internal.QueryMonitor; -import com.gemstone.gemfire.cache.query.internal.ResultsBag; import com.gemstone.gemfire.cache.query.internal.ResultsSet; -import com.gemstone.gemfire.cache.query.internal.RuntimeIterator; import com.gemstone.gemfire.cache.query.internal.SortedResultsBag; import com.gemstone.gemfire.cache.query.internal.SortedStructBag; -import com.gemstone.gemfire.cache.query.internal.StructBag; -import com.gemstone.gemfire.cache.query.internal.StructImpl; import com.gemstone.gemfire.cache.query.internal.StructSet; -import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes; -import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; import com.gemstone.gemfire.cache.query.internal.utils.PDXUtils; import com.gemstone.gemfire.cache.query.types.ObjectType; import com.gemstone.gemfire.cache.query.types.StructType; @@ -93,8 +78,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.QueryMessage; import com.gemstone.gemfire.internal.cache.partitioned.StreamingPartitionOperation; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.pdx.PdxInstance; -import com.gemstone.gemfire.pdx.internal.PdxString; /** * This class sends the query on various <code>PartitionedRegion</code> data @@ -109,41 +92,8 @@ import com.gemstone.gemfire.pdx.internal.PdxString; public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation { private static final Logger logger = LogService.getLogger(); - - /** - * @author Mitch Thomas - * An ArraList which can be tainted - * @since 6.0 - */ - public static class TaintableArrayList extends ArrayList { - private boolean isPoison = false; - public synchronized void taint() { - this.isPoison = true; - super.clear(); - } - public boolean add(Object arg0) { - synchronized(this) { - if (this.isPoison) { - return false; - } else { - return super.add(arg0); - } - } - } - public synchronized boolean isConsumable() { - return !this.isPoison && size() > 0; - } - public synchronized boolean isTainted() { - return this.isPoison; - } - - public synchronized void untaint() { - this.isPoison = false; - } - } - /** - * An ArraList which might be unconsumable. + * An ArrayList which might be unconsumable. * @since 6.6.2 * @author shobhit */ @@ -221,22 +171,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation protected DistributionMessage createRequestMessage(InternalDistributedMember recipient, ReplyProcessor21 processor, List bucketIds) { return new QueryMessage(recipient, this.pr.getPRId(), processor, this.query, this.parameters, bucketIds); } - - - @Override - public Set<InternalDistributedMember> getPartitionedDataFrom(Set recipients) - throws com.gemstone.gemfire.cache.TimeoutException, InterruptedException, QueryException, ForceReattemptException { - if (Thread.interrupted()) throw new InterruptedException(); - if (recipients.isEmpty()) - return Collections.emptySet(); - - StreamingQueryPartitionResponse processor = new StreamingQueryPartitionResponse(this.sys, recipients); - DistributionMessage m = createRequestMessage(recipients, processor); - this.sys.getDistributionManager().putOutgoing(m); - // should we allow this to timeout? - Set<InternalDistributedMember> failedMembers = processor.waitForCacheOrQueryException(); - return failedMembers; - } /** @@ -311,23 +245,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation ((MemberResultsList) results).setLastChunkReceived(true); } } - - //this.resultsPerMember.putIfAbsent(sender, objects); - /* - boolean toContinue = true; - for (Iterator itr = objects.iterator(); itr.hasNext();) { - final Object o = itr.next(); - if (o instanceof PRQueryProcessor.EndOfBucket) { - int bucketId = ((PRQueryProcessor.EndOfBucket)o).getBucketId(); - synchronized (this.successfulBuckets) { - this.successfulBuckets.add(bucketId); - } - } - else { - saveDataForMember(o, sender); - } - } - */ + return true; } @@ -559,7 +477,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation } break; } - Map b2n = buildNodeToBucketMapForBuckets(caclulateRetryBuckets()); + Map b2n = buildNodeToBucketMapForBuckets(calculateRetryBuckets()); if (th != null) { th.hook(2); } @@ -576,39 +494,15 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation th.hook(3); } } - // the failed buckets are those in this.bucketsToQuery that are - // not present in this.successfulBuckets - /* - synchronized (this.successfulBuckets) { - this.bucketsToQuery.removeAll(this.successfulBuckets.toArray()); - this.successfulBuckets.clear(); - } - - */ + if (needsRetry) { String msg = "Failed to query all the partitioned region " + "dataset (buckets) after " + retry + " attempts."; if (isDebugEnabled) { - logger.debug("{} Unable to query some of the buckets from the set :{}", msg, this.caclulateRetryBuckets()); + logger.debug("{} Unable to query some of the buckets from the set :{}", msg, this.calculateRetryBuckets()); } throw new QueryException(msg); - - /* - if (anyOfTheseBucketsHasStorage(this.bucketsToQuery)) { - if (retry >= MAX_PR_QUERY_RETRIES) { - String msg = "Query failed to get all results after " + retry + " attempts"; - throw new QueryException(msg); - } else { - failMissingBuckets(); - } - } else { - String msg = "Data loss detected during query " - + this.query.getQueryString() - + " subsequent query results should be suspect."; - throw new QueryException(msg); - } - */ } return addResultsToResultSet(); @@ -629,44 +523,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation } } } - - private boolean anyOfTheseBucketsHasStorage(Set<Integer> failedBuckets) { - boolean haveStorage = false; - for (Integer bid : failedBuckets) { - if (this.pr.getRegionAdvisor().isStorageAssignedForBucket(bid)) { - Set ownrs = this.pr.getRegionAdvisor().getBucketOwners(bid); - for (Iterator boi = ownrs.iterator(); boi.hasNext(); ) { - InternalDistributedMember mem = (InternalDistributedMember)boi.next(); - TaintableArrayList tal = (TaintableArrayList)this.resultsPerMember.get(mem); - if (tal == null || !tal.isTainted()) { - haveStorage = true; - } - } - } - } - return haveStorage; - - /* - boolean haveStorage = false; - for (Iterator i = failedBuckets.iterator(); i.hasNext(); ) { - final Integer bid = i.next(); - if (this.pr.getRegionAdvisor().isStorageAssignedForBucket(bid)) { - Set ownrs = this.pr.getRegionAdvisor().getBucketOwners(bid); - for (Iterator boi = ownrs.iterator(); boi.hasNext(); ) { - InternalDistributedMember mem = (InternalDistributedMember)boi.next(); - TaintableArrayList tal = (TaintableArrayList)this.resultsPerMember.get(mem); - if (tal == null || !tal.isTainted()) { - haveStorage = true; - } - } - } - } - return haveStorage; - */ - } - - private Set<Integer> caclulateRetryBuckets() { + private Set<Integer> calculateRetryBuckets() { Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> memberToBucketList = node2bucketIds.entrySet().iterator(); final HashSet<Integer> retryBuckets = new HashSet<Integer>(); while (memberToBucketList.hasNext()) { @@ -955,74 +813,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation } - //returns attribute with escape quotes #51085 and #51886 - private String checkReservedKeyword(String attr) { - if (attr != null && attr.length() > 0 && attr.contains(".")) { - String[] splits = attr.split("[.]"); - StringBuffer sb = new StringBuffer(); - for (int i = 0; i < splits.length; i++) { - sb.append(checkReservedKeyword(splits[i]) + "."); - } - - if (sb.length() <= 1) { - attr = sb.toString(); - } - else { - attr = sb.substring(0, sb.length() - 1); - } - } - else if(DefaultQuery.reservedKeywords.contains(attr.toLowerCase())) { - attr = "\"" + attr + "\""; - } - return attr; - } - - /** - * This returns the query clause as represented in the application query. - * E.g.: returns p.status, p.getStatus() as represented by passed compiledValue. - */ - private String getQueryAttributes(CompiledValue cv, StringBuffer fromPath) throws QueryException { - // field with multiple level like p.pos.secId - String clause = ""; - if (cv.getType() == OQLLexerTokenTypes.Identifier) { - // It will be p.pos.secId - clause = ((CompiledID)cv).getId() + clause; - } else { - do { - if (cv.getType() == CompiledPath.PATH || cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) { - if (cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) { - CompiledIndexOperation cio = (CompiledIndexOperation)cv; - CompiledLiteral cl = (CompiledLiteral)cio.getExpression(); - StringBuffer sb = new StringBuffer(); - cl.generateCanonicalizedExpression(sb, null); - cv = ((CompiledIndexOperation)cv).getReceiver(); - if (sb.length() > 0) { - clause = "[" + sb.toString() + "]" + clause; - } - } - clause = ("." + ((CompiledPath)cv).getTailID() + clause); - } else if (cv.getType() == OQLLexerTokenTypes.METHOD_INV) { - // Function call. - clause = "." + ((CompiledOperation)cv).getMethodName() + "()" + clause; - } else { - throw new QueryException("Failed to evaluate order by attributes, found unsupported type " + cv.getType() + - " Unable to apply order-by on the partition region cumulative results."); - } - - cv = cv.getReceiver(); - } while (!(cv.getType() == OQLLexerTokenTypes.Identifier)); - - if (cv.getType() == OQLLexerTokenTypes.Identifier) { - clause = ((CompiledID)cv).getId() + clause; - // Append region iterator alias. p - if (fromPath != null) { - fromPath.append(((CompiledID)cv).getId()); - } - } - } - return clause; - } - /** * Generates a map with key as PR node and value as the list as a subset of * the bucketIds hosted by the node. @@ -1081,20 +871,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation //Put the failed members on the end of the list. if(failedMembers != null && !failedMembers.isEmpty()) { allNodes.removeAll(failedMembers); - //Collections.shuffle(allNodes, PartitionedRegion.rand); allNodes.addAll(failedMembers); } for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext() && (bucketIds.size() < totalBucketsToQuery); ) { InternalDistributedMember nd = (InternalDistributedMember)dsItr.next(); - /* - if(taintedMembers.contains(nd)) { - //clear the tainted state - resultsPerMember.get(nd).untaint(); - } - */ - final List<Integer> buckets = new ArrayList<Integer>(); for (Integer bid : bucketIdsToConsider) { if (!bucketIds.contains(bid)) { @@ -1141,14 +923,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation if (this.pr.getDataStore() != null) { this.pr.getDataStore().invokeBucketReadHook(); final InternalDistributedMember me = this.pr.getMyId(); - //Create PRQueryResultCollector here. - //RQueryResultCollector resultCollector = new PRQueryResultCollector(); List<Integer> bucketList = this.node2bucketIds.get(me); - //try { - - //this.pr.getDataStore().queryLocalNode(this.query, this.parameters, - // bucketList, resultCollector); try { PRQueryProcessor qp = new PRQueryProcessor(this.pr, query, parameters, bucketList); MemberResultsList resultCollector = new MemberResultsList(); @@ -1208,167 +984,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation } return true; } - /* - ExecutionContext context = new ExecutionContext(parameters, this.pr.getCache()); - context.setBucketList(bucketList); - try { - SelectResults results = (SelectResults)this.query.executeUsingContext(context); - addToResultCollector(results, resultCollector, me); - } catch (BucketMovedException bme) { - return true; - } - - //this.successfulBuckets.addAll(context.getSuccessfulBuckets()); - for (Object o: context.getBucketList()) { - Integer bId = (Integer)o; - this.successfulBuckets.add(bId.intValue()); - } - - /* - } catch (ForceReattemptException retryRequired) { - return true; - } - */ - /* - int tokenCount = 0; - final int numBuckets = bucketList.size(); - // finished when we get the nth END_OF_STREAM token, where n is the number of buckets - boolean toContinue = true; - Object o = null; - while (tokenCount < numBuckets) { - o = resultCollector.get(); - if (o instanceof PRQueryProcessor.EndOfBucket) { - int bucketId = ((PRQueryProcessor.EndOfBucket)o).getBucketId(); - synchronized (this.successfulBuckets) { - this.successfulBuckets.add(bucketId); - } - tokenCount++; - } else { - if (o == DefaultQuery.NULL_RESULT) { - o = null; - } - saveDataForMember(o, me); - } - } - Assert.assertTrue(resultCollector.isEmpty()); - */ } return false; } - - /* - private void saveDataForMember(final Object data, final InternalDistributedMember member) { - TaintableArrayList existing = this.resultsPerMember.get(member); - if (existing == null) { - synchronized (member) { - existing = new TaintableArrayList(); - this.resultsPerMember.putIfAbsent(member, existing); - } - } - existing.add(data); - } - */ protected void memberStreamCorrupted(InternalDistributedMember sender) { this.resultsPerMember.remove(sender); - /* - final TaintableArrayList tainted = new TaintableArrayList(); - tainted.taint(); - TaintableArrayList existing = - (TaintableArrayList)this.resultsPerMember.putIfAbsent(sender, tainted); - if (existing != null) { - existing.taint(); - } - - ArrayList bucketIds = (ArrayList)this.node2bucketIds.get(sender); - if (bucketIds != null) { - ArrayList removedBucketIds = null; - for (Iterator i = bucketIds.iterator(); i.hasNext(); ) { - Integer bid = (Integer)i.next(); - synchronized(this.successfulBuckets) { - if (this.successfulBuckets.remove(bid.intValue())) { - if (removedBucketIds == null) { - removedBucketIds = new ArrayList(); - } - removedBucketIds.add(bid); - } - } - } - - } - */ - } - - // @todo need to throw a better exception than QueryException - /** - * Fail due to not getting all the data back for all the buckets, - * reporting which buckets failed on which nodes. - * - * @throws QueryException always throws - * since QueryException should be abstract - */ - private void failMissingBuckets() throws QueryException { - // convert to Map of nodes to bucket ids for error message - Map n2b = new HashMap(); - for (Integer bId : this.bucketsToQuery) { - InternalDistributedMember node = findNodeForBucket(bId); - List listOfInts = (List)n2b.get(node); - if (listOfInts == null) { - listOfInts = new ArrayList<Integer>(); - n2b.put(node, listOfInts); - } - listOfInts.add(bId); - } - - /* - Iterator = this.bucketsToQuery.iterator(); - int sz = intArray.length; - Map n2b = new HashMap(); - for (int i = 0; i < sz; i++) { - Integer bucketId = Integer.valueOf(intArray[i]); - InternalDistributedMember node = findNodeForBucket(bucketId); - List listOfInts = (List)n2b.get(node); - if (listOfInts == null) { - listOfInts = new ArrayList(); - n2b.put(node, listOfInts); - } - listOfInts.add(bucketId); - } - */ - - // One last check, after all else is said and done: - // if the system is closing, don't fail the query, but - // generate a much more serious error... - this.pr.getCancelCriterion().checkCancelInProgress(null); - - // the failure - String msg = "Query failed; unable to get results from the following node/buckets: " - + n2b; - - logger.fatal(msg); - throw new QueryException( // @todo what is a better exception to throw here? - msg); - - /* alternative strategy: re-query - queryBuckets(); - */ - - } - - private InternalDistributedMember findNodeForBucket(Integer bucketId) { - for (Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> itr = this.node2bucketIds.entrySet().iterator(); itr.hasNext(); ) { - Map.Entry<InternalDistributedMember,List<Integer>> entry = itr.next(); - List<Integer> blist = entry.getValue(); - for (Iterator<Integer> itr2 = blist.iterator(); itr2.hasNext(); ) { - Integer bid = itr2.next(); - if (bid.equals(bucketId)) { - return (InternalDistributedMember)entry.getKey(); - } - } - } - String msg = "Unable to get node for bucket id " + bucketId + " node to bucket map is " + this.node2bucketIds; - logger.fatal(msg); - throw new InternalGemFireError(msg); } /**
