Repository: usergrid Updated Branches: refs/heads/release-2.1.1 843578310 -> e64fa3503
Initial work to iterate over the shards with more context so we don't fetch all rows (shards) at the same time always. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8c725f19 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8c725f19 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8c725f19 Branch: refs/heads/release-2.1.1 Commit: 8c725f19aa30a1cca5b71017c8a43586b6e4d544 Parents: 8435783 Author: Michael Russo <[email protected]> Authored: Mon Mar 14 16:07:19 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Mon Mar 14 16:07:19 2016 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 12 +- .../usergrid/corepersistence/index/RxTest.java | 129 ++++++++++ .../core/astyanax/MultiRowColumnIterator.java | 253 +++++++++++++++---- .../persistence/core/shard/SmartShard.java | 52 ++++ .../graph/serialization/impl/shard/Shard.java | 33 ++- .../impl/shard/impl/EdgeSearcher.java | 86 ++++++- .../shard/impl/NodeShardAllocationImpl.java | 4 +- .../impl/shard/impl/NodeShardCacheImpl.java | 19 +- .../shard/impl/ShardGroupCompactionImpl.java | 19 ++ .../impl/ShardedEdgeSerializationImpl.java | 9 + .../impl/shard/impl/ShardsColumnIterator.java | 30 ++- .../graph/GraphManagerShardConsistencyIT.java | 17 +- 12 files changed, 583 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 4d78340..7e368c7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; import org.apache.usergrid.persistence.index.impl.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -519,6 +520,13 @@ public class AsyncEventServiceImpl implements AsyncEventService { } + // don't let this continue if there's nothing to index + if (indexOperationMessage == null || indexOperationMessage.isEmpty()){ + throw new RuntimeException( + "IndexOperationMessage cannot be null or empty after retrieving from map persistence"); + } + + // always do a check to ensure the indexes are initialized for the index requests initializeEntityIndexes(indexOperationMessage); @@ -739,9 +747,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { */ private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { - // if nothing came back then return null + // if nothing came back then return empty list if(indexEventResults==null){ - return null; + return new ArrayList<>(0); } IndexOperationMessage combined = new IndexOperationMessage(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java index 6bb8947..f44c028 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java @@ -20,14 +20,21 @@ package org.apache.usergrid.corepersistence.index; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; import org.apache.usergrid.ExperimentalTest; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import rx.Subscription; import rx.observables.ConnectableObservable; @@ -42,6 +49,9 @@ import static org.junit.Assert.assertTrue; */ public class RxTest { + private static final Logger logger = LoggerFactory.getLogger(RxTest.class); + + @Test @Category(ExperimentalTest.class ) public void testPublish() throws InterruptedException { @@ -107,5 +117,124 @@ public class RxTest { assertEquals(0, result); } + @Test + public void testStreamWithinObservable(){ + + List<Integer> numbers = new ArrayList<Integer>(5){{ + add(1); + add(2); + add(3); + add(4); + add(5); + }}; + + Observable.just(numbers).map( integers -> { + + try{ + + logger.info("Starting size: {}", String.valueOf(numbers.size())); + + List<StreamResult> results = callStream(integers); + + logger.info("In process size: {}", String.valueOf(results.size())); + + List<Integer> checked = checkResults(results); + + logger.info("Resulting Size: {}", String.valueOf(checked.size())); + + return results; + + } + catch(Exception e){ + + logger.info("Caught exception in observable: {}", e.getMessage()); + return null; + + + } + + }).subscribe(); + + + + + + + + } + + private List<StreamResult> callStream (final List<Integer> input){ + + Stream<StreamResult> results = input.stream().map(integer -> { + + try{ + + + + if(integer.equals(1) || integer.equals(2)){ + throwSomeException("Ah integer not what we want!"); + } + + return new StreamResult(integer); + + } + catch(Exception e){ + + logger.info("Caught exception in stream: '{}'", e.getMessage()); + return new StreamResult(0); + + } + + }); + + return results.collect(Collectors.toList()); + + } + + + private List<Integer> checkResults(final List<StreamResult> streamResults){ + + List<Integer> combined = new ArrayList<>(); + List<Integer> integers = streamResults.stream().filter( streamResult -> streamResult.getNumber() > 0) + .map(streamResult -> { + + combined.add(streamResult.getNumber()); + + return streamResult.getNumber(); + }) + .collect(Collectors.toList()); + + Observable.from(combined).map( s -> { + logger.info("Doing work in another observable with Integer: {}", s); + return s; + }).toBlocking().last(); + + + return integers; + + } + + + public class StreamResult { + + private int number; + + public StreamResult( final int number){ + + this.number = number; + } + + public int getNumber(){ + return number; + } + + + } + + public void throwSomeException(String message){ + + throw new RuntimeException(message); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java index a120fda..9971fba 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java @@ -1,35 +1,29 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.usergrid.persistence.core.astyanax; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; +import org.apache.avro.generic.GenericData; +import org.apache.usergrid.persistence.core.shard.SmartShard; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +73,14 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { private Iterator<T> currentColumnIterator; + private Iterator<SmartShard> currentShardIterator; + + private List<SmartShard> rowKeysWithShardEnd; + + private SmartShard currentShard; + + private List<T> resultsTracking; + /** * Remove after finding bug @@ -110,6 +112,28 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { this.keyspace = keyspace; this.consistencyLevel = consistencyLevel; this.moreToReturn = true; + this.resultsTracking = new ArrayList<>(); + + // seenResults = new HashMap<>( pageSize * 10 ); + } + + public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf, + final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser, + final ColumnSearch<T> columnSearch, final Comparator<T> comparator, + final Collection<R> rowKeys, final int pageSize, + final List<SmartShard> rowKeysWithShardEnd) { + this.cf = cf; + this.pageSize = pageSize; + this.columnParser = columnParser; + this.columnSearch = columnSearch; + this.comparator = comparator; + this.rowKeys = rowKeys; + this.keyspace = keyspace; + this.consistencyLevel = consistencyLevel; + this.moreToReturn = true; + this.rowKeysWithShardEnd = rowKeysWithShardEnd; + this.resultsTracking = new ArrayList<>(); + // seenResults = new HashMap<>( pageSize * 10 ); } @@ -117,12 +141,34 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { @Override public boolean hasNext() { + //logger.info(Thread.currentThread().getName()+" - calling hasNext()"); + if( currentColumnIterator != null && !currentColumnIterator.hasNext() && !moreToReturn){ + if(currentShardIterator.hasNext()) { + logger.info(Thread.currentThread().getName()+" - advancing shard iterator"); + //logger.info(Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext()); + logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd); + //Collections.reverse(rowKeysWithShardEnd); + logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd); + + logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard); + currentShard = currentShardIterator.next(); + logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard); + startColumn = null; + + advance(); + } + } if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) { - advance(); - } + if(currentColumnIterator != null) { + logger.info(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext()); + } + logger.info(Thread.currentThread().getName()+" - moreToReturn={}", moreToReturn); + logger.info(Thread.currentThread().getName()+" - going into advance()"); + advance(); + } return currentColumnIterator.hasNext(); } @@ -148,7 +194,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { public void advance() { - + logger.info( "Advancing multi row column iterator" ); if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" ); /** @@ -161,11 +207,33 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize; + //final int selectSize = pageSize; + final RangeBuilder rangeBuilder = new RangeBuilder(); - //set the range into the search + + if(currentShardIterator == null){ + currentShardIterator = rowKeysWithShardEnd.iterator(); + + } + + if(currentShard == null){ + Collections.reverse(rowKeysWithShardEnd); // ranges are ascending + logger.info(Thread.currentThread().getName()+" - currentShard: {}", currentShard); + currentShard = currentShardIterator.next(); + logger.info(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd); + logger.info(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard); + + } + + + + + + //set the range into the search + logger.info(Thread.currentThread().getName()+" - startColumn={}", startColumn); if ( startColumn == null ) { columnSearch.buildRange( rangeBuilder ); } @@ -181,9 +249,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { /** * Get our list of slices */ + //logger.info("shard: {}, end: {}",currentShard.getRowKey().getKey(), currentShard.getShardEnd()); final RowSliceQuery<R, C> query = - keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys ) - .withColumnRange( rangeBuilder.build() ); + keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() ) + .withColumnRange( rangeBuilder.build() ); final Rows<R, C> result; try { @@ -194,6 +263,33 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { } + +// List<RowSliceQuery<R, C>> queries = new ArrayList<>(); +// +// rowKeys.forEach( rowkey -> { +// +// queries.add(keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys ) +// .withColumnRange( rangeBuilder.build() )); +// +// }); +// +// +// final List<Rows<R,C>> combinedResults = new ArrayList<>(); +// +// queries.forEach(query ->{ +// +// try { +// combinedResults.add(query.execute().getResult()); +// } +// catch ( ConnectionException e ) { +// throw new RuntimeException( "Unable to connect to casandra", e ); +// } +// +// }); + + + + //now aggregate them together //this is an optimization. It's faster to see if we only have values for one row, @@ -201,14 +297,34 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { //do a merge if only one row has data. + final List<T> mergedResults; - if ( containsSingleRowOnly( result ) ) { - mergedResults = singleRowResult( result ); - } - else { - mergedResults = mergeResults( result, selectSize ); - } + mergedResults = mergeResults( result, selectSize ); + +// if ( containsSingleRowOnly( result ) ) { +// mergedResults = singleRowResult( result ); +// } +// else { +// mergedResults = mergeResults( result, selectSize ); +// } + + + +// final List<T> mergedResults = new ArrayList<>(); +// +// combinedResults.forEach(rows -> { +// +// if ( containsSingleRowOnly( rows ) ) { +// mergedResults.addAll(singleRowResult( rows )); +// } +// else { +// mergedResults.addAll(mergeResults( rows, selectSize )); +// } +// +// }); + + @@ -223,8 +339,20 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final int size = mergedResults.size(); + + + if(logger.isTraceEnabled()){ + logger.trace(Thread.currentThread().getName()+" - current shard: {}, retrieved size: {}", currentShard, size); + + } + + logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size); moreToReturn = size == selectSize; +// if(selectSize == 1001 && mergedResults.size() == 1000){ +// moreToReturn = true; +// } + //we have a first column to to check if( size > 0) { @@ -232,6 +360,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { //The search has either told us to skip the first element, or it matches our last, therefore we disregard it if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){ + logger.info("removing an entry"); mergedResults.remove( 0 ); } @@ -240,10 +369,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { if(moreToReturn && mergedResults.size() > 0){ startColumn = mergedResults.get( mergedResults.size() - 1 ); + } + logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard); + logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size); + + +// if(mergedResults.size() == 0 && currentShardIterator.hasNext()){ +// //currentShard = currentShardIterator.next(); +// +// } + currentColumnIterator = mergedResults.iterator(); + //logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd); + logger.info( + Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " + + "moreToReturn={}, currentShardIterator.hasNext()={}", + currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext()); if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() ); } @@ -328,7 +472,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final T returnedValue = columnParser.parseColumn( column ); //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations - int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator ); + int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator ); /** * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition, @@ -350,29 +494,37 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { //we've already seen it, no-op if(searchIndex > -1){ + logger.info("skipping column as it was already retrieved before"); continue; } - final int insertIndex = (searchIndex+1)*-1; +// final int insertIndex = (searchIndex+1)*-1; +// +// //it's at the end of the list, don't bother inserting just to remove it +// if(insertIndex >= maxSize){ +// logger.info("skipping column as it was at the end of the list"); +// continue; +// } - //it's at the end of the list, don't bother inserting just to remove it - if(insertIndex >= maxSize){ - continue; - } + resultsTracking.add(returnedValue); - if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex ); + //if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex ); - mergedResults.add( insertIndex, returnedValue ); + //mergedResults.add( insertIndex, returnedValue ); + mergedResults.add(returnedValue ); - //prune the mergedResults - while ( mergedResults.size() > maxSize ) { - if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize ); - - //just remove from our tail until the size falls to the correct value - mergedResults.remove(mergedResults.size()-1); - } + //prune the mergedResults +// while ( mergedResults.size() > maxSize ) { +// +// if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize ); +// +// //just remove from our tail until the size falls to the correct value +// mergedResults.remove(mergedResults.size()-1); +// resultsTracking.remove(resultsTracking.size()-1); +// +// } } if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() ); @@ -381,7 +533,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { return mergedResults; } - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java new file mode 100644 index 0000000..8a1bee8 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.core.shard; + +import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; + +public class SmartShard<R, C> { + + final ScopedRowKey<R> rowKey; + final C shardEnd; + + + public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){ + + this.rowKey = rowKey; + this.shardEnd = shardEnd; + } + + + public ScopedRowKey<R> getRowKey(){ + return rowKey; + } + + public C getShardEnd(){ + return shardEnd; + } + + @Override + public String toString(){ + + return "Shard { rowKey="+rowKey + ", shardEnd="+shardEnd+" }"; + + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java index 472e0a2..92793cb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java @@ -19,6 +19,9 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard; +import com.google.common.base.Optional; +import org.apache.usergrid.persistence.graph.Edge; + public class Shard implements Comparable<Shard> { @@ -30,12 +33,14 @@ public class Shard implements Comparable<Shard> { private final long shardIndex; private final long createdTime; private final boolean compacted; + private Optional<Edge> shardEnd; public Shard( final long shardIndex, final long createdTime, final boolean compacted ) { this.shardIndex = shardIndex; this.createdTime = createdTime; this.compacted = compacted; + this.shardEnd = Optional.absent(); } @@ -71,6 +76,14 @@ public class Shard implements Comparable<Shard> { return shardIndex == MIN_SHARD.shardIndex; } + public void setShardEnd(final Optional<Edge> shardEnd) { + this.shardEnd = shardEnd; + } + + public Optional<Edge> getShardEnd() { + return shardEnd; + } + /** * Compare the shards based on the timestamp first, then the created time second @@ -149,10 +162,20 @@ public class Shard implements Comparable<Shard> { @Override public String toString() { - return "Shard{" + - "shardIndex=" + shardIndex + - ", createdTime=" + createdTime + - ", compacted=" + compacted + - '}'; + + StringBuilder string = new StringBuilder(); + string.append("Shard{ "); + string.append("shardIndex=").append(shardIndex); + string.append(", createdTime=").append(createdTime); + string.append(", compacted=").append(compacted); + string.append(", shardEndTimestamp="); + if(shardEnd.isPresent()){ + string.append(shardEnd.get().getTimestamp()); + }else{ + string.append("null"); + } + string.append(" }"); + + return string.toString(); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java index 4d02ba9..e0ba3ec 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java @@ -1,16 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; +import java.util.*; +import org.apache.http.cookie.SM; import org.apache.usergrid.persistence.core.astyanax.ColumnParser; import org.apache.usergrid.persistence.core.astyanax.ColumnSearch; +import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.shard.SmartShard; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; @@ -22,6 +40,10 @@ import com.google.common.base.Preconditions; import com.netflix.astyanax.Serializer; import com.netflix.astyanax.model.Column; import com.netflix.astyanax.util.RangeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator.*; /** @@ -34,6 +56,9 @@ import com.netflix.astyanax.util.RangeBuilder; */ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{ + private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class ); + + protected final Optional<T> last; protected final long maxTimestamp; protected final ApplicationScope scope; @@ -52,6 +77,8 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum this.shards = shards; this.last = last; this.comparator = comparator; + + //logger.info("initializing with shards: {}", shards); } @@ -59,6 +86,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum public List<ScopedRowKey<R>> getRowKeys() { List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size()); + //logger.info("shards: {}", shards); for(Shard shard : shards){ @@ -72,6 +100,33 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum return rowKeys; } + public List<SmartShard> getRowKeysWithShardEnd(){ + + + final List<SmartShard> rowKeysWithShardEnd = new ArrayList<>(shards.size()); + + for(Shard shard : shards){ + + final ScopedRowKey< R> rowKey = ScopedRowKey + .fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) ); + + final C shardEnd; + if(shard.getShardEnd().isPresent()){ + shardEnd = createColumn((T) shard.getShardEnd().get()); + + }else{ + shardEnd = null; + } + + + + rowKeysWithShardEnd.add(new SmartShard(rowKey, shardEnd)); + } + + return rowKeysWithShardEnd; + + } + @Override public boolean skipFirst( final T first ) { @@ -127,6 +182,29 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum } +// public class SmartShard { +// +// final ScopedRowKey<R> rowKey; +// final C shardEnd; +// +// +// public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){ +// +// this.rowKey = rowKey; +// this.shardEnd = shardEnd; +// } +// +// +// public ScopedRowKey<R> getRowKey(){ +// return rowKey; +// } +// +// public C getShardEnd(){ +// return shardEnd; +// } +// +// } + /** * Get the comparator http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index a79b91a..6f95cf5 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -98,13 +98,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { else { existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta ); + //logger.info("existing shards has something: {}", existingShards.hasNext()); /** * We didn't get anything out of cassandra, so we need to create the minumum shard */ if ( existingShards == null || !existingShards.hasNext() ) { - + //logger.info("writing min shard"); final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta ); try { batch.execute(); @@ -117,6 +118,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { } } + //logger.info("getShards existing shards: {}", existingShards); return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope, directedEdgeMeta ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java index 1a88ebb..3ff9d47 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java @@ -164,14 +164,19 @@ public class NodeShardCacheImpl implements NodeShardCache { final CacheKey key = new CacheKey( scope, directedEdgeMeta ); CacheEntry entry; - try { - entry = this.graphs.get( key ); - } - catch ( ExecutionException e ) { - throw new GraphRuntimeException( "Unable to load shard key for graph", e ); - } +// try { +// entry = this.graphs.get( key ); +// } +// catch ( ExecutionException e ) { +// throw new GraphRuntimeException( "Unable to load shard key for graph", e ); +// } + + final Iterator<ShardEntryGroup> edges = + nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ); + + final CacheEntry cacheEntry = new CacheEntry( edges ); - Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp ); + Iterator<ShardEntryGroup> iterator = cacheEntry.getShards( maxTimestamp ); if ( iterator == null ) { return Collections.<ShardEntryGroup>emptyList().iterator(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index f0b0ac9..80b63ec 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import com.google.common.base.Optional; +import org.apache.usergrid.persistence.graph.Edge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +171,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final MutationBatch newRowBatch = keyspace.prepareMutationBatch(); final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch(); + final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch(); /** * As we move edges, we want to keep track of it @@ -181,10 +184,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); + MarkedEdge shardEnd = null; + while ( edges.hasNext() ) { final MarkedEdge edge = edges.next(); final long edgeTimestamp = edge.getTimestamp(); + shardEnd = edge; /** * The edge is within a different shard, break @@ -202,6 +208,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { .deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge, timestamp ) ); + edgeCount++; //if we're at our count, execute the mutation of writing the edges to the new row, then remove them @@ -217,12 +224,21 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } } } + + Shard updatedShard = new Shard( sourceShard.getShardIndex(), sourceShard.getCreatedTime(), sourceShard.isCompacted() ); + updatedShard.setShardEnd(Optional.fromNullable(shardEnd)); + logger.info("updating with shard end: {}", shardEnd ); + updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, updatedShard, edgeMeta)); + } + + try { newRowBatch.execute(); deleteRowBatch.execute(); + updateShardMetaBatch.execute(); } catch ( Throwable t ) { logger.error( "Unable to move edges to target shard {}", targetShard ); @@ -232,6 +248,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { if (logger.isTraceEnabled()) { logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount); } + logger.info("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount); + resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard ); @@ -276,6 +294,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { //Overwrite our shard index with a newly created one that has been marked as compacted Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true ); + final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta ); try { updateMark.execute(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index ce0953c..c3e0cc0 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -67,6 +67,8 @@ import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.Serializer; import com.netflix.astyanax.util.RangeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; @@ -77,6 +79,9 @@ import static com.google.common.base.Preconditions.checkNotNull; @Singleton public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { + private static final Logger logger = LoggerFactory.getLogger( ShardedEdgeSerializationImpl.class ); + + protected final Keyspace keyspace; protected final CassandraConfig cassandraConfig; protected final GraphFig graphFig; @@ -401,6 +406,10 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { ValidationUtils.validateApplicationScope( scope ); GraphValidation.validateSearchByEdgeType( search ); + if(logger.isTraceEnabled()){ + logger.info("getEdgesFromSource shards: {}", shards); + } + final Id sourceId = search.getNode(); final String type = search.getType(); final long maxTimestamp = search.getMaxTimestamp(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java index d1000fb..af9d979 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java @@ -1,10 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; +import org.apache.usergrid.persistence.core.shard.SmartShard; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,14 +127,19 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> { */ final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys(); + final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd(); + if (logger.isTraceEnabled()) { logger.trace("Searching with row keys {}", rowKeys); } - currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize); + //currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize); + currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 3ae3ff1..b131e95 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -203,7 +203,10 @@ public class GraphManagerShardConsistencyIT { // power for writes final int numProcessors = Runtime.getRuntime().availableProcessors() / 2; - final int numWorkersPerInjector = numProcessors / numInjectors; + //final int numWorkersPerInjector = numProcessors / numInjectors; + + final int numWorkersPerInjector = 1; + /** @@ -268,7 +271,9 @@ public class GraphManagerShardConsistencyIT { final List<Throwable> failures = new ArrayList<>(); - for(int i = 0; i < 2; i ++) { + Thread.sleep(5000); + + for(int i = 0; i < 1; i ++) { /** @@ -656,7 +661,7 @@ public class GraphManagerShardConsistencyIT { final long startTime = System.currentTimeMillis(); - for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime; i++ ) { + for ( long i = 1; i < writeLimit +1 && System.currentTimeMillis() - startTime < minExecutionTime; i++ ) { Edge edge = generator.newEdge(); @@ -671,8 +676,8 @@ public class GraphManagerShardConsistencyIT { writeCounter.incrementAndGet(); - if ( i % 1000 == 0 ) { - logger.info( " Wrote: " + i ); + if ( i % 100 == 0 ) { + logger.info( Thread.currentThread().getName()+" wrote: " + i ); } } @@ -718,7 +723,7 @@ public class GraphManagerShardConsistencyIT { logger.info( "Completed reading {} edges", returnedEdgeCount ); if ( writeCount != returnedEdgeCount ) { - logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", writeCount, + logger.warn( Thread.currentThread().getName()+" - Unexpected edge count returned!!! Expected {} but was {}", writeCount, returnedEdgeCount ); }
