Reset original multi-row column iterator and use a new class for the smart iterating over edge shards. Change to system time for 'last write wins' in cassandra instead of a shard's 'createdTime'.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bec50939 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bec50939 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bec50939 Branch: refs/heads/release-2.1.1 Commit: bec5093978175c87b7d76f66c8a503f062275ead Parents: 58ae197 Author: Michael Russo <[email protected]> Authored: Sun Mar 20 17:49:10 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Sun Mar 20 17:49:10 2016 -0700 ---------------------------------------------------------------------- .../core/astyanax/MultiRowColumnIterator.java | 346 ++++++-------- .../astyanax/MultiRowShardColumnIterator.java | 455 +++++++++++++++++++ .../shard/impl/EdgeShardSerializationImpl.java | 14 +- .../shard/impl/ShardGroupCompactionImpl.java | 99 ++-- .../impl/shard/impl/ShardsColumnIterator.java | 3 +- .../graph/GraphManagerShardConsistencyIT.java | 3 +- .../impl/shard/EdgeShardSerializationTest.java | 12 +- 7 files changed, 665 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java index 6049c1f..c071d53 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java @@ -20,9 +20,14 @@ package org.apache.usergrid.persistence.core.astyanax; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; -import org.apache.usergrid.persistence.core.shard.SmartShard; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,18 +77,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { private Iterator<T> currentColumnIterator; - private Iterator<SmartShard> currentShardIterator; - - private List<SmartShard> rowKeysWithShardEnd; - - private SmartShard currentShard; - - private List<T> resultsTracking; - - private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition - - private boolean ascending = false; - /** * Remove after finding bug @@ -115,63 +108,18 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { this.keyspace = keyspace; this.consistencyLevel = consistencyLevel; this.moreToReturn = true; - this.resultsTracking = new ArrayList<>(); - - } - - // temporarily use a new constructor for specific searches until we update each caller of this class - public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf, - final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser, - final ColumnSearch<T> columnSearch, final Comparator<T> comparator, - final Collection<R> rowKeys, final int pageSize, - final List<SmartShard> rowKeysWithShardEnd, - final boolean ascending) { - this.cf = cf; - this.pageSize = pageSize; - this.columnParser = columnParser; - this.columnSearch = columnSearch; - this.comparator = comparator; - this.rowKeys = rowKeys; - this.keyspace = keyspace; - this.consistencyLevel = consistencyLevel; - this.moreToReturn = true; - this.rowKeysWithShardEnd = rowKeysWithShardEnd; - this.resultsTracking = new ArrayList<>(); - this.ascending = ascending; + // seenResults = new HashMap<>( pageSize * 10 ); } @Override public boolean hasNext() { - // if column iterator is null, initialize with first call to advance() - // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c* if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) { advance(); } - // when there are no more columns, nothing reported to return, but more shards available, go to the next shard - if( currentColumnIterator != null && !currentColumnIterator.hasNext() && - !moreToReturn && currentShardIterator.hasNext()){ - - if(logger.isTraceEnabled()){ - logger.trace("Advancing shard iterator"); - logger.trace("Shard before advance: {}", currentShard); - } - - - // advance to the next shard - currentShard = currentShardIterator.next(); - - if(logger.isTraceEnabled()){ - logger.trace("Shard after advance: {}", currentShard); - - } - - advance(); - - } return currentColumnIterator.hasNext(); } @@ -198,6 +146,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { public void advance() { + if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" ); /** @@ -206,130 +155,32 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final boolean skipFirstColumn = startColumn != null; - final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize; - - final RangeBuilder rangeBuilder = new RangeBuilder(); - - - if(currentShardIterator == null){ - - // flip the order of our shards if ascending - if(ascending){ - Collections.reverse(rowKeysWithShardEnd); - } - - currentShardIterator = rowKeysWithShardEnd.iterator(); - - } - - if(currentShard == null){ - - if(logger.isTraceEnabled()){ - logger.trace("currentShard: {}", currentShard); - } - - currentShard = currentShardIterator.next(); - - if(logger.isTraceEnabled()){ - logger.trace("all shards when starting: {}", rowKeysWithShardEnd); - logger.trace("initializing iterator with shard: {}", currentShard); - } - - - } + final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize; + final RangeBuilder rangeBuilder = new RangeBuilder(); - // initial request, build the range with no start and no end - if ( startColumn == null && currentShard.getShardEnd() == null ){ + //set the range into the search + if ( startColumn == null ) { columnSearch.buildRange( rangeBuilder ); - - if(logger.isTraceEnabled()){ - logger.trace("initial search (no start or shard end)"); - } - } - // if there's only a startColumn set the range start startColumn always - else if ( startColumn != null && currentShard.getShardEnd() == null ){ - + else { columnSearch.buildRange( rangeBuilder, startColumn, null ); - - if(logger.isTraceEnabled()){ - logger.trace("search (no shard end) with start: {}", startColumn); - } - - } - // if there's only a shardEnd, set the start/end according based on the search order - else if ( startColumn == null && currentShard.getShardEnd() != null ){ - - T shardEnd = (T) currentShard.getShardEnd(); - - // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start - if(!ascending) { - - columnSearch.buildRange(rangeBuilder, shardEnd, null); - - if(logger.isTraceEnabled()){ - logger.trace("search descending with start: {}", shardEnd); - } - - } - // if we have a shardEnd and it is an ascending search, use the shardEnd as the end - else{ - - columnSearch.buildRange( rangeBuilder, null, shardEnd ); - - if(logger.isTraceEnabled()){ - logger.trace("search ascending with end: {}", shardEnd); - } - - } - } - // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order - else if ( startColumn != null && currentShard.getShardEnd() != null) { - T shardEnd = (T) currentShard.getShardEnd(); - - - // if the search is not ascending, set the start to be the older edge - if(!ascending){ - - T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn; - columnSearch.buildRange( rangeBuilder, searchStart, null); - - if(logger.isTraceEnabled()){ - logger.trace("search descending with start: {} in shard", searchStart, currentShard); - } - - } - // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end - else{ - - columnSearch.buildRange( rangeBuilder, startColumn , shardEnd); - - if(logger.isTraceEnabled()){ - logger.trace("search with start: {}, end: {}", startColumn, shardEnd); - } - - - - } - - } rangeBuilder.setLimit( selectSize ); - if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard ); + if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query" ); /** * Get our list of slices */ final RowSliceQuery<R, C> query = - keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() ) + keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys ) .withColumnRange( rangeBuilder.build() ); final Rows<R, C> result; @@ -341,43 +192,36 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { } + //now aggregate them together + //this is an optimization. It's faster to see if we only have values for one row, + // then return the iterator of those columns than + //do a merge if only one row has data. - final List<T> mergedResults; - - skipSize = 0; - mergedResults = processResults( result, selectSize ); + final List<T> mergedResults; - if(logger.isTraceEnabled()){ - logger.trace("skipped amount: {}", skipSize); + if ( containsSingleRowOnly( result ) ) { + mergedResults = singleRowResult( result ); + } + else { + mergedResults = mergeResults( result, selectSize ); } - final int size = mergedResults.size(); + //we've parsed everything truncate to the first pageSize, it's all we can ensure is correct without another + //trip back to cassandra - if(logger.isTraceEnabled()){ - logger.trace("current shard: {}, retrieved size: {}", currentShard, size); - logger.trace("selectSize={}, size={}, ", selectSize, size); + //discard our first element (maybe) - } - - moreToReturn = size == selectSize; - - if(selectSize == 1001 && mergedResults.size() == 1000){ - moreToReturn = true; - } + final int size = mergedResults.size(); - // if a whole page is skipped OR the result size equals the the difference of what's skipped, - // it is likely during a shard transition and we should assume there is more to read - if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){ - moreToReturn = true; - } + moreToReturn = size == selectSize; //we have a first column to to check if( size > 0) { @@ -386,53 +230,93 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { //The search has either told us to skip the first element, or it matches our last, therefore we disregard it if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){ - if(logger.isTraceEnabled()){ - logger.trace("removing an entry"); - - } mergedResults.remove( 0 ); } } - // set the start column for the enxt query if(moreToReturn && mergedResults.size() > 0){ startColumn = mergedResults.get( mergedResults.size() - 1 ); - } currentColumnIterator = mergedResults.iterator(); + if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() ); + } + + + /** + * Return true if we have < 2 rows with columns, false otherwise + */ + private boolean containsSingleRowOnly( final Rows<R, C> result ) { - //force an advance of this iterator when there are still shards to read but result set on current shard is 0 - if(size == 0 && currentShardIterator.hasNext()){ - hasNext(); + int count = 0; + + for ( R key : result.getKeys() ) { + if ( result.getRow( key ).getColumns().size() > 0 ) { + count++; + + //we have more than 1 row with values, return them + if ( count > 1 ) { + return false; + } + } } - if(logger.isTraceEnabled()){ - logger.trace("currentColumnIterator.hasNext()={}, " + - "moreToReturn={}, currentShardIterator.hasNext()={}", - currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext()); - } + return true; + } + + + /** + * A single row is present, only parse the single row + * @param result + * @return + */ + private List<T> singleRowResult( final Rows<R, C> result ) { + + if (logger.isTraceEnabled()) logger.trace( "Only a single row has columns. Parsing directly" ); + + for ( R key : result.getKeys() ) { + final ColumnList<C> columnList = result.getRow( key ).getColumns(); + + final int size = columnList.size(); + + if ( size > 0 ) { + + final List<T> results = new ArrayList<>(size); + + for(Column<C> column: columnList){ + results.add(columnParser.parseColumn( column )); + } + + return results; + } + } + + //we didn't have any results, just return nothing + return Collections.<T>emptyList(); } /** - * Process the result set and filter any duplicates that may have already been seen in previous shards. During - * a shard transition, there could be the same columns in multiple shards (rows). This will also allow for - * filtering the startColumn (the seek starting point) when paging a row in Cassandra. - * + * Multiple rows are present, merge them into a single result set * @param result * @return */ - private List<T> processResults(final Rows<R, C> result, final int maxSize ) { + private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) { + + if (logger.isTraceEnabled()) logger.trace( "Multiple rows have columns. Merging" ); + final List<T> mergedResults = new ArrayList<>(maxSize); + + + for ( final R key : result.getKeys() ) { final ColumnList<C> columns = result.getRow( key ).getColumns(); @@ -441,24 +325,52 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final T returnedValue = columnParser.parseColumn( column ); - // use an O(log n) search, same as a tree, but with fast access to indexes for later operations - int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator ); + //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations + int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator ); + + /** + * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition, + * you'll + * need to enable this + */ + // + // if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) { + // throw new RuntimeException( String.format( + // "Cassandra returned 2 unique columns, + // but your comparator marked them as equal. This " + + // "indicates a bug in your comparator. Previous value was %s and + // current value is " + + // "%s", + // previous, returnedValue ) ); + // } + // + // previous = returnedValue; + + //we've already seen it, no-op + if(searchIndex > -1){ + continue; + } + final int insertIndex = (searchIndex+1)*-1; - //we've already seen the column, filter it out as we might be in a shard transition or our start column - if(searchIndex > -1){ - if(logger.isTraceEnabled()){ - logger.trace("skipping column as it was already retrieved before"); - } - skipSize++; + //it's at the end of the list, don't bother inserting just to remove it + if(insertIndex >= maxSize){ continue; } + if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex ); + + mergedResults.add( insertIndex, returnedValue ); - resultsTracking.add(returnedValue); - mergedResults.add(returnedValue ); + //prune the mergedResults + while ( mergedResults.size() > maxSize ) { + if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize ); + + //just remove from our tail until the size falls to the correct value + mergedResults.remove(mergedResults.size()-1); + } } if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() ); @@ -467,6 +379,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { return mergedResults; } -} +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java new file mode 100644 index 0000000..bfc04c4 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.astyanax; + + +import java.util.*; + +import org.apache.usergrid.persistence.core.shard.SmartShard; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.model.ColumnList; +import com.netflix.astyanax.model.ConsistencyLevel; +import com.netflix.astyanax.model.Rows; +import com.netflix.astyanax.query.RowSliceQuery; +import com.netflix.astyanax.util.RangeBuilder; + + +/** + * + * + */ +public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { + + private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class ); + + private final int pageSize; + + private final ColumnFamily<R, C> cf; + + + private final ColumnParser<C, T> columnParser; + + private final ColumnSearch<T> columnSearch; + + private final Comparator<T> comparator; + + + private final Collection<R> rowKeys; + + private final Keyspace keyspace; + + private final ConsistencyLevel consistencyLevel; + + + private T startColumn; + + + private boolean moreToReturn; + + + private Iterator<T> currentColumnIterator; + + private Iterator<SmartShard> currentShardIterator; + + private List<SmartShard> rowKeysWithShardEnd; + + private SmartShard currentShard; + + private List<T> resultsTracking; + + private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition + + private boolean ascending = false; + + + /** + * Remove after finding bug + */ + + + // private int advanceCount; + // + // private final HashMap<T, SeekPosition> seenResults; + + /** + * Complete Remove + */ + + + /** + * Create the iterator + */ + // temporarily use a new constructor for specific searches until we update each caller of this class + public MultiRowShardColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf, + final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser, + final ColumnSearch<T> columnSearch, final Comparator<T> comparator, + final Collection<R> rowKeys, final int pageSize, + final List<SmartShard> rowKeysWithShardEnd, + final boolean ascending) { + this.cf = cf; + this.pageSize = pageSize; + this.columnParser = columnParser; + this.columnSearch = columnSearch; + this.comparator = comparator; + this.rowKeys = rowKeys; + this.keyspace = keyspace; + this.consistencyLevel = consistencyLevel; + this.moreToReturn = true; + this.rowKeysWithShardEnd = rowKeysWithShardEnd; + this.resultsTracking = new ArrayList<>(); + this.ascending = ascending; + + } + + + @Override + public boolean hasNext() { + + // if column iterator is null, initialize with first call to advance() + // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c* + if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) { + advance(); + } + + // when there are no more columns, nothing reported to return, but more shards available, go to the next shard + if( currentColumnIterator != null && !currentColumnIterator.hasNext() && + !moreToReturn && currentShardIterator.hasNext()){ + + if(logger.isTraceEnabled()){ + logger.trace("Advancing shard iterator"); + logger.trace("Shard before advance: {}", currentShard); + } + + + // advance to the next shard + currentShard = currentShardIterator.next(); + + if(logger.isTraceEnabled()){ + logger.trace("Shard after advance: {}", currentShard); + + } + + advance(); + + } + + return currentColumnIterator.hasNext(); + } + + + @Override + public T next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No new element exists" ); + } + + final T next = currentColumnIterator.next(); + + + return next; + } + + + @Override + public void remove() { + throw new UnsupportedOperationException( "Remove is unsupported this is a read only iterator" ); + } + + + public void advance() { + + if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" ); + + /** + * If the edge is present, we need to being seeking from this + */ + + final boolean skipFirstColumn = startColumn != null; + + final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize; + + final RangeBuilder rangeBuilder = new RangeBuilder(); + + + + + if(currentShardIterator == null){ + + // flip the order of our shards if ascending + if(ascending){ + Collections.reverse(rowKeysWithShardEnd); + } + + currentShardIterator = rowKeysWithShardEnd.iterator(); + + } + + if(currentShard == null){ + + if(logger.isTraceEnabled()){ + logger.trace("currentShard: {}", currentShard); + } + + currentShard = currentShardIterator.next(); + + if(logger.isTraceEnabled()){ + logger.trace("all shards when starting: {}", rowKeysWithShardEnd); + logger.trace("initializing iterator with shard: {}", currentShard); + } + + + } + + + + // initial request, build the range with no start and no end + if ( startColumn == null && currentShard.getShardEnd() == null ){ + + columnSearch.buildRange( rangeBuilder ); + + if(logger.isTraceEnabled()){ + logger.trace("initial search (no start or shard end)"); + } + + } + // if there's only a startColumn set the range start startColumn always + else if ( startColumn != null && currentShard.getShardEnd() == null ){ + + columnSearch.buildRange( rangeBuilder, startColumn, null ); + + if(logger.isTraceEnabled()){ + logger.trace("search (no shard end) with start: {}", startColumn); + } + + } + // if there's only a shardEnd, set the start/end according based on the search order + else if ( startColumn == null && currentShard.getShardEnd() != null ){ + + T shardEnd = (T) currentShard.getShardEnd(); + + // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start + if(!ascending) { + + columnSearch.buildRange(rangeBuilder, shardEnd, null); + + if(logger.isTraceEnabled()){ + logger.trace("search descending with start: {}", shardEnd); + } + + } + // if we have a shardEnd and it is an ascending search, use the shardEnd as the end + else{ + + columnSearch.buildRange( rangeBuilder, null, shardEnd ); + + if(logger.isTraceEnabled()){ + logger.trace("search ascending with end: {}", shardEnd); + } + + } + + } + // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order + else if ( startColumn != null && currentShard.getShardEnd() != null) { + + T shardEnd = (T) currentShard.getShardEnd(); + + + // if the search is not ascending, set the start to be the older edge + if(!ascending){ + + T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn; + columnSearch.buildRange( rangeBuilder, searchStart, null); + + if(logger.isTraceEnabled()){ + logger.trace("search descending with start: {} in shard", searchStart, currentShard); + } + + } + // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end + else{ + + columnSearch.buildRange( rangeBuilder, startColumn , shardEnd); + + if(logger.isTraceEnabled()){ + logger.trace("search with start: {}, end: {}", startColumn, shardEnd); + } + + + + } + + } + + rangeBuilder.setLimit( selectSize ); + + if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard ); + + /** + * Get our list of slices + */ + final RowSliceQuery<R, C> query = + keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() ) + .withColumnRange( rangeBuilder.build() ); + + final Rows<R, C> result; + try { + result = query.execute().getResult(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to casandra", e ); + } + + + + + final List<T> mergedResults; + + skipSize = 0; + + mergedResults = processResults( result, selectSize ); + + if(logger.isTraceEnabled()){ + logger.trace("skipped amount: {}", skipSize); + } + + + + final int size = mergedResults.size(); + + + + if(logger.isTraceEnabled()){ + logger.trace("current shard: {}, retrieved size: {}", currentShard, size); + logger.trace("selectSize={}, size={}, ", selectSize, size); + + + } + + moreToReturn = size == selectSize; + + if(selectSize == 1001 && mergedResults.size() == 1000){ + moreToReturn = true; + } + + + // if a whole page is skipped OR the result size equals the the difference of what's skipped, + // it is likely during a shard transition and we should assume there is more to read + if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){ + moreToReturn = true; + } + + //we have a first column to to check + if( size > 0) { + + final T firstResult = mergedResults.get( 0 ); + + //The search has either told us to skip the first element, or it matches our last, therefore we disregard it + if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){ + if(logger.isTraceEnabled()){ + logger.trace("removing an entry"); + + } + mergedResults.remove( 0 ); + } + + } + + + // set the start column for the enxt query + if(moreToReturn && mergedResults.size() > 0){ + startColumn = mergedResults.get( mergedResults.size() - 1 ); + + } + + + currentColumnIterator = mergedResults.iterator(); + + + //force an advance of this iterator when there are still shards to read but result set on current shard is 0 + if(size == 0 && currentShardIterator.hasNext()){ + hasNext(); + } + + if(logger.isTraceEnabled()){ + logger.trace("currentColumnIterator.hasNext()={}, " + + "moreToReturn={}, currentShardIterator.hasNext()={}", + currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext()); + } + + + } + + + /** + * Process the result set and filter any duplicates that may have already been seen in previous shards. During + * a shard transition, there could be the same columns in multiple shards (rows). This will also allow for + * filtering the startColumn (the seek starting point) when paging a row in Cassandra. + * + * @param result + * @return + */ + private List<T> processResults(final Rows<R, C> result, final int maxSize ) { + + final List<T> mergedResults = new ArrayList<>(maxSize); + + for ( final R key : result.getKeys() ) { + final ColumnList<C> columns = result.getRow( key ).getColumns(); + + + for (final Column<C> column :columns ) { + + final T returnedValue = columnParser.parseColumn( column ); + + // use an O(log n) search, same as a tree, but with fast access to indexes for later operations + int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator ); + + + //we've already seen the column, filter it out as we might be in a shard transition or our start column + if(searchIndex > -1){ + if(logger.isTraceEnabled()){ + logger.trace("skipping column as it was already retrieved before"); + } + skipSize++; + continue; + } + + + resultsTracking.add(returnedValue); + mergedResults.add(returnedValue ); + + + } + + if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() ); + + } + return mergedResults; + } + +} + + http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java index d22f472..5eeeae0 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java @@ -107,8 +107,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization { final MutationBatch batch = keyspace.prepareMutationBatch(); - batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey ) - .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard)); + // write the row with a current timestamp so we can ensure that it's persisted with updated shard meta + long batchTimestamp = System.currentTimeMillis(); + + batch.withTimestamp( batchTimestamp ).withRow( EDGE_SHARDS, rowKey ) + .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard)).setTimestamp(batchTimestamp); return batch; } @@ -163,8 +166,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization { final MutationBatch batch = keyspace.prepareMutationBatch(); - batch.withTimestamp(shard.getCreatedTime()).withRow( EDGE_SHARDS, rowKey ) - .deleteColumn( shard.getShardIndex() ); + // write the row with a current timestamp so we can ensure that it's persisted with updated shard meta + long batchTimestamp = System.currentTimeMillis(); + + batch.withTimestamp(batchTimestamp).withRow( EDGE_SHARDS, rowKey ) + .deleteColumn( shard.getShardIndex() ).setTimestamp(batchTimestamp); return batch; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 8728c6c..e63db46 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -161,17 +161,22 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final int maxWorkSize = graphFig.getScanPageSize(); - final MutationBatch newRowBatch = keyspace.prepareMutationBatch(); - final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch(); - final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch(); + /** * As we move edges, we want to keep track of it */ - long edgeCount = 0; + long totalEdgeCount = 0; for ( Shard sourceShard : sourceShards ) { + + final MutationBatch newRowBatch = keyspace.prepareMutationBatch(); + final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch(); + final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch(); + + long edgeCount = 0; + Iterator<MarkedEdge> edges = edgeMeta .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); @@ -183,6 +188,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final long edgeTimestamp = edge.getTimestamp(); + shardEnd = edge; /** * The edge is within a different shard, break @@ -203,6 +209,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { edgeCount++; + // if we're at our count, execute the mutation of writing the edges to the new row, then remove them // from the old rows if ( edgeCount % maxWorkSize == 0 ) { @@ -214,15 +221,15 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { // write the edges into the new shard atomically so we know they all succeed newRowBatch.withAtomicBatch(true).execute(); - // set the shardEnd after the write is known to be successful - shardEnd = edge; // Update the shard end after each batch so any reads during transition stay as close to current sourceShard.setShardEnd( Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp())) ); - logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, edgeMeta.getNodes(), shardEnd ); + if(logger.isTraceEnabled()) { + logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd); + } updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta)); @@ -231,74 +238,85 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { // on purpose block this thread before deleting the old edges to be sure there are no gaps // duplicates are filtered on graph seeking so this is OK Thread.sleep(1000); - logger.info("Deleting batch of {} from old shard", maxWorkSize); - deleteRowBatch.execute(); + + if(logger.isTraceEnabled()) { + logger.trace("Deleting batch of {} from old shard", maxWorkSize); + } + deleteRowBatch.withAtomicBatch(true).execute(); + + updateShardMetaBatch.execute(); } catch ( Throwable t ) { logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard ); } - }else { - - shardEnd = edge; + totalEdgeCount += edgeCount; + edgeCount = 0; } } - if (shardEnd != null && edgeCount > 0){ + totalEdgeCount += edgeCount; - sourceShard.setShardEnd( - Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp())) - ); + try { - logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, shardEnd ); - updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta)); - } + // write the edges into the new shard atomically so we know they all succeed + newRowBatch.withAtomicBatch(true).execute(); - } + // on purpose block this thread before deleting the old edges to be sure there are no gaps + // duplicates are filtered on graph seeking so this is OK + Thread.sleep(1000); + if(logger.isTraceEnabled()) { + logger.trace("Deleting remaining {} edges from old shard", edgeCount); + } + deleteRowBatch.withAtomicBatch(true).execute(); + if (shardEnd != null){ + sourceShard.setShardEnd( + Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp())) + ); - try { + if(logger.isTraceEnabled()) { + logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd); + } + updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta)); + updateShardMetaBatch.execute(); + } - // write the edges into the new shard atomically so we know they all succeed - newRowBatch.withAtomicBatch(true).execute(); - // on purpose block this thread before deleting the old edges to be sure there are no gaps - // duplicates are filtered on graph seeking so this is OK - Thread.sleep(1000); + } + catch ( Throwable t ) { + logger.error( "Unable to move edges to target shard {}", targetShard ); + } - logger.info("Deleting remaining edges from old shard"); - deleteRowBatch.execute(); - // now update with our shard end - updateShardMetaBatch.execute(); } - catch ( Throwable t ) { - logger.error( "Unable to move edges to target shard {}", targetShard ); - } + + if (logger.isTraceEnabled()) { - logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount); + logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount); } - logger.info("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount); + + logger.info("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount); - resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard ); + resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard ); /** * We didn't move anything this pass, mark the shard as compacted. If we move something, * it means that we missed it on the first pass * or someone is still not writing to the target shard only. */ - if ( edgeCount == 0 ) { + if ( totalEdgeCount == 0 ) { //now that we've marked our target as compacted, we can successfully remove any shards that are not @@ -329,11 +347,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { throw new RuntimeException( "Unable to connect to cassandra", e ); } - - logger.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", targetShard ); - //Overwrite our shard index with a newly created one that has been marked as compacted Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true ); + compactedShard.setShardEnd(targetShard.getShardEnd()); + + logger.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard ); + final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta ); try { http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java index e609d33..e2dd549 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.util.*; +import org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator; import org.apache.usergrid.persistence.core.shard.SmartShard; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.slf4j.Logger; @@ -136,7 +137,7 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> { logger.trace("Searching with row keys {}", rowKeys); } - currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, + currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending); http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 9e6996d..e7027f4 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -82,6 +82,7 @@ import static org.junit.Assert.fail; public class GraphManagerShardConsistencyIT { private static final Logger logger = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class ); + private static final MetricRegistry registry = new MetricRegistry(); private static final Meter writeMeter = registry.meter( "writeThroughput" ); @@ -102,7 +103,7 @@ public class GraphManagerShardConsistencyIT { protected ListeningExecutorService deleteExecutor; - protected int TARGET_NUM_SHARDS = 6; + protected int TARGET_NUM_SHARDS = 5; http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java index 1f8bfa9..145aa03 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java @@ -209,7 +209,7 @@ public class EdgeShardSerializationTest { } @Test - public void sameShardIndexRemoval() throws ConnectionException { + public void testShardDelete() throws ConnectionException { final Id now = IdGenerator.createId( "test" ); @@ -217,11 +217,15 @@ public class EdgeShardSerializationTest { final Shard shard1 = new Shard( 1000L, timestamp, false ); final Shard shard2 = new Shard( shard1.getShardIndex(), timestamp * 2, true ); + final Shard shard3 = new Shard( shard2.getShardIndex() * 2, timestamp * 3, true ); + final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( now, "edgeType", "subType" ); MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, shard1, sourceEdgeMeta ); batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard2, sourceEdgeMeta ) ); + batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard3, sourceEdgeMeta ) ); + batch.execute(); @@ -229,16 +233,16 @@ public class EdgeShardSerializationTest { edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta ); // Latest timestamp comes first - assertEquals( shard2, results.next() ); + assertEquals( shard3, results.next() ); // This should now not remove anything - edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute(); + edgeShardSerialization.removeShardMeta( scope, shard3, sourceEdgeMeta ).execute(); // Get iterator again results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta ); - // We should still have shard3 stored + // We should still have shard2 stored assertEquals( shard2, results.next() );
