Clean up the logging, ensure the order of shard iterator within MultiRowColumnIterator is correct. Restore NodeShardCache logic.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4bbebc5f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4bbebc5f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4bbebc5f Branch: refs/heads/release-2.1.1 Commit: 4bbebc5fd759efe59bae612c9f47e36589750982 Parents: 92fae0d Author: Michael Russo <[email protected]> Authored: Tue Mar 15 21:59:21 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Tue Mar 15 21:59:21 2016 -0700 ---------------------------------------------------------------------- .../core/astyanax/MultiRowColumnIterator.java | 201 ++++++------------- .../impl/shard/impl/EdgeSearcher.java | 36 +--- .../impl/shard/impl/NodeShardCacheImpl.java | 19 +- .../shard/impl/ShardEntryGroupIterator.java | 12 ++ .../impl/shard/impl/ShardsColumnIterator.java | 10 +- .../graph/GraphManagerShardConsistencyIT.java | 54 ++--- .../graph/src/test/resources/log4j.properties | 4 + 7 files changed, 111 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java index c384899..10786f7 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java @@ -22,7 +22,6 @@ package org.apache.usergrid.persistence.core.astyanax; import java.util.*; -import org.apache.avro.generic.GenericData; import org.apache.usergrid.persistence.core.shard.SmartShard; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +80,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { private List<T> resultsTracking; + private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition + + private boolean ascending = false; + /** * Remove after finding bug @@ -114,14 +117,15 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { this.moreToReturn = true; this.resultsTracking = new ArrayList<>(); - // seenResults = new HashMap<>( pageSize * 10 ); } + // temporarily use a new constructor for specific searches until we update each caller of this class public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf, final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser, final ColumnSearch<T> columnSearch, final Comparator<T> comparator, final Collection<R> rowKeys, final int pageSize, - final List<SmartShard> rowKeysWithShardEnd) { + final List<SmartShard> rowKeysWithShardEnd, + final boolean ascending) { this.cf = cf; this.pageSize = pageSize; this.columnParser = columnParser; @@ -133,54 +137,45 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { this.moreToReturn = true; this.rowKeysWithShardEnd = rowKeysWithShardEnd; this.resultsTracking = new ArrayList<>(); + this.ascending = ascending; - - // seenResults = new HashMap<>( pageSize * 10 ); } @Override public boolean hasNext() { - if( currentColumnIterator != null && !currentColumnIterator.hasNext() && !moreToReturn){ - if(currentShardIterator.hasNext()) { + // if column iterator is null, initialize with first call to advance() + // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c* + if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) { + advance(); + } + + // when there are no more columns, nothing reported to return, but more shards available, go to the next shard + if( currentColumnIterator != null && !currentColumnIterator.hasNext() && + !moreToReturn && currentShardIterator.hasNext()){ if(logger.isTraceEnabled()){ - logger.trace(Thread.currentThread().getName()+" - advancing shard iterator"); - logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd); - logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd); - logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard); + logger.trace("Advancing shard iterator"); + logger.trace("Shard before advance: {}", currentShard); } + // advance to the next shard currentShard = currentShardIterator.next(); if(logger.isTraceEnabled()){ - logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard); + logger.trace("Shard after advance: {}", currentShard); } + // reset the start column as we'll be seeking a new row, any duplicates will be filtered out startColumn = null; advance(); - } - } - - if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) { - if(currentColumnIterator != null) { - if(logger.isTraceEnabled()){ - logger.trace(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext()); - - } - } - if(logger.isTraceEnabled()){ - logger.trace(Thread.currentThread().getName()+" - going into advance()"); - - } - - advance(); } + return currentColumnIterator.hasNext(); } @@ -214,24 +209,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final boolean skipFirstColumn = startColumn != null; - - final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize; - //final int selectSize = pageSize; - final RangeBuilder rangeBuilder = new RangeBuilder(); if(currentShardIterator == null){ + + // flip the order of our shards if ascending + if(ascending){ + Collections.reverse(rowKeysWithShardEnd); + } + currentShardIterator = rowKeysWithShardEnd.iterator(); } if(currentShard == null){ - Collections.reverse(rowKeysWithShardEnd); // ranges are ascending if(logger.isTraceEnabled()){ logger.trace(Thread.currentThread().getName()+" - currentShard: {}", currentShard); @@ -266,7 +262,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { rangeBuilder.setLimit( selectSize ); - if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query" ); + if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard ); /** * Get our list of slices @@ -285,65 +281,17 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { -// List<RowSliceQuery<R, C>> queries = new ArrayList<>(); -// -// rowKeys.forEach( rowkey -> { -// -// queries.add(keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys ) -// .withColumnRange( rangeBuilder.build() )); -// -// }); -// -// -// final List<Rows<R,C>> combinedResults = new ArrayList<>(); -// -// queries.forEach(query ->{ -// -// try { -// combinedResults.add(query.execute().getResult()); -// } -// catch ( ConnectionException e ) { -// throw new RuntimeException( "Unable to connect to casandra", e ); -// } -// -// }); - - - - - //now aggregate them together - - //this is an optimization. It's faster to see if we only have values for one row, - // then return the iterator of those columns than - //do a merge if only one row has data. - - final List<T> mergedResults; - mergedResults = mergeResults( result, selectSize ); - -// if ( containsSingleRowOnly( result ) ) { -// mergedResults = singleRowResult( result ); -// } -// else { -// mergedResults = mergeResults( result, selectSize ); -// } + skipSize = 0; + mergedResults = processResults( result, selectSize ); + if(logger.isTraceEnabled()){ + logger.trace("skipped amount: {}", skipSize); + } -// final List<T> mergedResults = new ArrayList<>(); -// -// combinedResults.forEach(rows -> { -// -// if ( containsSingleRowOnly( rows ) ) { -// mergedResults.addAll(singleRowResult( rows )); -// } -// else { -// mergedResults.addAll(mergeResults( rows, selectSize )); -// } -// -// }); final int size = mergedResults.size(); @@ -363,6 +311,12 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { moreToReturn = true; } + + // if a whole page is skipped, this is likely during a shard transition and we should assume there is more to read + if( skipSize == selectSize || skipSize == selectSize - 1){ + moreToReturn = true; + } + //we have a first column to to check if( size > 0) { @@ -380,21 +334,20 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { } + // set the start column for the enxt query if(moreToReturn && mergedResults.size() > 0){ startColumn = mergedResults.get( mergedResults.size() - 1 ); } - if(logger.isTraceEnabled()){ - logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard); - logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size); - } - - + currentColumnIterator = mergedResults.iterator(); - currentColumnIterator = mergedResults.iterator(); + //force an advance of this iterator when there are still shards to read but result set on current shard is 0 + if(size == 0 && currentShardIterator.hasNext()){ + hasNext(); + } if(logger.isTraceEnabled()){ logger.trace( @@ -404,7 +357,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { } - if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() ); } @@ -464,20 +416,17 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { /** - * Multiple rows are present, merge them into a single result set + * Process the result set and filter any duplicates that may have already been seen in previous shards. During + * a shard transition, there could be the same columns in multiple shards (rows). This will also allow for + * filtering the startColumn (the seek starting point) when paging a row in Cassandra. + * * @param result * @return */ - private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) { - - if (logger.isTraceEnabled()) logger.trace( "Multiple rows have columns. Merging" ); - + private List<T> processResults(final Rows<R, C> result, final int maxSize ) { final List<T> mergedResults = new ArrayList<>(maxSize); - - - for ( final R key : result.getKeys() ) { final ColumnList<C> columns = result.getRow( key ).getColumns(); @@ -486,62 +435,24 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final T returnedValue = columnParser.parseColumn( column ); - //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations + // use an O(log n) search, same as a tree, but with fast access to indexes for later operations int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator ); - /** - * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition, - * you'll - * need to enable this - */ - // - // if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) { - // throw new RuntimeException( String.format( - // "Cassandra returned 2 unique columns, - // but your comparator marked them as equal. This " + - // "indicates a bug in your comparator. Previous value was %s and - // current value is " + - // "%s", - // previous, returnedValue ) ); - // } - // - // previous = returnedValue; - - //we've already seen it, no-op + + //we've already seen the column, filter it out as we might be in a shard transition or our start column if(searchIndex > -1){ if(logger.isTraceEnabled()){ logger.trace("skipping column as it was already retrieved before"); } + skipSize++; continue; } -// final int insertIndex = (searchIndex+1)*-1; -// -// //it's at the end of the list, don't bother inserting just to remove it -// if(insertIndex >= maxSize){ -// logger.info("skipping column as it was at the end of the list"); -// continue; -// } resultsTracking.add(returnedValue); - - //if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex ); - - //mergedResults.add( insertIndex, returnedValue ); mergedResults.add(returnedValue ); - - //prune the mergedResults -// while ( mergedResults.size() > maxSize ) { -// -// if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize ); -// -// //just remove from our tail until the size falls to the correct value -// mergedResults.remove(mergedResults.size()-1); -// resultsTracking.remove(resultsTracking.size()-1); -// -// } } if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java index e0ba3ec..2f5817d 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java @@ -56,9 +56,6 @@ import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterat */ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{ - private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class ); - - protected final Optional<T> last; protected final long maxTimestamp; protected final ApplicationScope scope; @@ -78,7 +75,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum this.last = last; this.comparator = comparator; - //logger.info("initializing with shards: {}", shards); } @@ -86,7 +82,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum public List<ScopedRowKey<R>> getRowKeys() { List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size()); - //logger.info("shards: {}", shards); for(Shard shard : shards){ @@ -175,37 +170,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum } private void setRangeOptions(final RangeBuilder rangeBuilder){ - //if we're ascending, this is opposite what cassandra sorts, so set the reversed flag + + //if we're ascending, this is opposite what cassandra sorts, so set the reversed flag final boolean reversed = order == SearchByEdgeType.Order.ASCENDING; rangeBuilder.setReversed( reversed ); } -// public class SmartShard { -// -// final ScopedRowKey<R> rowKey; -// final C shardEnd; -// -// -// public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){ -// -// this.rowKey = rowKey; -// this.shardEnd = shardEnd; -// } -// -// -// public ScopedRowKey<R> getRowKey(){ -// return rowKey; -// } -// -// public C getShardEnd(){ -// return shardEnd; -// } -// -// } - - /** * Get the comparator * @return @@ -214,6 +186,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum return comparator; } + public SearchByEdgeType.Order getOrder(){ + return order; + } + /** * Get the column's serializer http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java index 3ff9d47..1a88ebb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java @@ -164,19 +164,14 @@ public class NodeShardCacheImpl implements NodeShardCache { final CacheKey key = new CacheKey( scope, directedEdgeMeta ); CacheEntry entry; -// try { -// entry = this.graphs.get( key ); -// } -// catch ( ExecutionException e ) { -// throw new GraphRuntimeException( "Unable to load shard key for graph", e ); -// } - - final Iterator<ShardEntryGroup> edges = - nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ); - - final CacheEntry cacheEntry = new CacheEntry( edges ); + try { + entry = this.graphs.get( key ); + } + catch ( ExecutionException e ) { + throw new GraphRuntimeException( "Unable to load shard key for graph", e ); + } - Iterator<ShardEntryGroup> iterator = cacheEntry.getShards( maxTimestamp ); + Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp ); if ( iterator == null ) { return Collections.<ShardEntryGroup>emptyList().iterator(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java index f1b5108..b64bb58 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java @@ -14,6 +14,8 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroup import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.schedulers.Schedulers; @@ -23,6 +25,9 @@ import rx.schedulers.Schedulers; */ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> { + private static final Logger logger = LoggerFactory.getLogger( ShardEntryGroupIterator.class ); + + private final ShardGroupCompaction shardGroupCompaction; private final PushbackIterator<Shard> sourceIterator; @@ -106,11 +111,18 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> { //we can't add this one to the entries, it doesn't fit within the delta, allocate a new one and break if ( next.addShard( shard ) ) { + + if(logger.isTraceEnabled()) { + logger.trace("adding shard: {}", shard); + } continue; } sourceIterator.pushback( shard ); + if(logger.isTraceEnabled()) { + logger.trace("unable to add shard: {}, pushing back and stopping", shard); + } break; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java index af9d979..e609d33 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.util.*; import org.apache.usergrid.persistence.core.shard.SmartShard; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,15 +130,14 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> { final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd(); + final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING; + if (logger.isTraceEnabled()) { logger.trace("Searching with row keys {}", rowKeys); } - //currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize); - currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd); - - - + currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, + searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 2602e88..82b0879 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -1,22 +1,20 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.usergrid.persistence.graph; @@ -205,8 +203,6 @@ public class GraphManagerShardConsistencyIT { final int numWorkersPerInjector = numProcessors / numInjectors; - //final int numWorkersPerInjector = 1; - /** @@ -218,16 +214,13 @@ public class GraphManagerShardConsistencyIT { final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors; - final long expectedShardCount = numberOfEdges / shardSize; - - createExecutor( numWorkersPerInjector ); final AtomicLong writeCounter = new AtomicLong(); //min stop time the min delta + 1 cache cycle timeout - final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout(); + final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout() + 60000; logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector, @@ -266,14 +259,14 @@ public class GraphManagerShardConsistencyIT { final long writeCount = writeCounter.get(); + final long expectedShardCount = writeCount / shardSize; final Meter readMeter = registry.meter( "readThroughput" ); final List<Throwable> failures = new ArrayList<>(); + //Thread.sleep(5000); - Thread.sleep(5000); - - for(int i = 0; i < 1; i ++) { + for(int i = 0; i < 2; i ++) { /** @@ -351,14 +344,9 @@ public class GraphManagerShardConsistencyIT { //we're done if ( compactedCount >= expectedShardCount ) { - logger.info( "All compactions complete, sleeping" ); - - // final Object mutex = new Object(); - // - // synchronized ( mutex ){ - // - // mutex.wait(); - // } + + logger.info( "All compactions complete, sleeping. Compacted shard count={}, expected shard count={}", + compactedCount, expectedShardCount ); break; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties index 608ee03..79401c3 100644 --- a/stack/corepersistence/graph/src/test/resources/log4j.properties +++ b/stack/corepersistence/graph/src/test/resources/log4j.properties @@ -37,4 +37,8 @@ log4j.logger.cassandra.db=ERROR #log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE +#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=INFO +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=INFO +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=INFO +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=INFO
