http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java new file mode 100644 index 0000000..d598a2e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from a set of Ids. + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) { + + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() ); + + //it's more efficient to make 1 network hop to get everything, then drop our results if required + final Observable<FilterResult<Entity>> entityObservable = + filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> { + + final Observable<EntitySet> entitySetObservable = + Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList() + .flatMap( ids -> entityCollectionManager.load( ids ) ); + + + //now we have a collection, validate our canidate set is correct. + + return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) ) + .doOnNext( entityCollector -> entityCollector.merge() ).flatMap( + entityCollector -> Observable.from( entityCollector.getResults() ) ); + } ); + + return entityObservable; + } + + + /** + * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this + * state is mutable and difficult to represent functionally + */ + private static final class EntityVerifier { + + private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); + private List<FilterResult<Entity>> results = new ArrayList<>(); + + private final List<FilterResult<Id>> candidateResults; + private final EntitySet entitySet; + + + public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) { + this.entitySet = entitySet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( entitySet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final FilterResult<Id> candidateResult : candidateResults ) { + validate( candidateResult ); + } + } + + + public List<FilterResult<Entity>> getResults() { + return results; + } + + + private void validate( final FilterResult<Id> filterResult ) { + + final Id candidateId = filterResult.getValue(); + + + final MvccEntity entity = entitySet.getEntity( candidateId ); + + + //doesn't exist warn and drop + if ( entity == null || !entity.getEntity().isPresent() ) { + logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra." + + " Ignoring since this could be a region sync issue", candidateId ); + + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + } + + //it exists, add it + + final Entity returnEntity = entity.getEntity().get(); + + final Optional<EdgePath> parent = filterResult.getPath(); + + final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent ); + + results.add( toReturn ); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java index 12306fd..7371579 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java @@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType /** * Command for reading graph edges on a connection */ -public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> { +public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> { private final GraphManagerFactory graphManagerFactory; private final String connectionName; @@ -61,8 +62,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I } + @Override - public Observable<Id> call( final Observable<Id> observable ) { + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) { //get the graph manager final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); @@ -73,20 +75,18 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I //return all ids that are emitted from this edge - return observable.flatMap( id -> { + return filterResultObservable.flatMap( idFilterResult -> { //set our our constant state final Optional<Edge> startFromCursor = getSeekValue(); + final Id id = idFilterResult.getValue(); final SimpleSearchByIdType search = new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, entityType, startFromCursor ); - /** - * TODO, pass a message with pointers to our cursor values to be generated later - */ - return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map( - edge -> edge.getTargetNode() ); + return graphManager.loadEdgesFromSourceByType( search ).map( + edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() )); } ); } @@ -95,4 +95,6 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I protected CursorSerializer<Edge> getCursorSerializer() { return EdgeCursorSerializer.INSTANCE; } + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java index f3c2a9c..0260d1d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -26,12 +26,10 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import org.apache.usergrid.corepersistence.pipeline.PipelineResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.persistence.EntityFactory; import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -51,13 +49,17 @@ public class ObservableQueryExecutor implements QueryExecutor { public Iterator<Results> iterator; - public ObservableQueryExecutor( final Observable<PipelineResult<ResultsPage>> resultsObservable ) { + public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) { //map to our old results objects, return a default empty if required this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() ); } - + /** + * + * @param cpEntity + * @return + */ private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) { @@ -72,9 +74,8 @@ public class ObservableQueryExecutor implements QueryExecutor { return entity; } - private Results createResults( final PipelineResult<ResultsPage> pipelineResults ){ + private Results createResults( final ResultsPage resultsPage ){ - final ResultsPage resultsPage = pipelineResults.getResult(); final List<Entity> entityList = resultsPage.getEntityList(); final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() ); @@ -85,10 +86,15 @@ public class ObservableQueryExecutor implements QueryExecutor { final Results results = Results.fromEntities( resultsEntities ); - if(pipelineResults.getCursor().isPresent()) { - results.setCursor( pipelineResults.getCursor().get() ); - } + //add the cursor if our limit is the same + if(resultsPage.hasMoreResults()) { + final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString(); + + if ( cursor.isPresent() ) { + results.setCursor( cursor.get() ); + } + } return results; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java index 6e15d54..fd65ebf 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java @@ -24,11 +24,11 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; import org.junit.Test; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; -import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; import com.google.common.base.Optional; @@ -41,7 +41,10 @@ public class CursorTest { @Test public void testCursors(){ - ResponseCursor responseCursor = new ResponseCursor(); + + + + //test encoding edge @@ -58,13 +61,18 @@ public class CursorTest { final Integer query2 = 20; - responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE ); - responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE ); + final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() ); + + final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path )); + + final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) ); + + final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) ); + - responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE); - responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE); + ResponseCursor responseCursor = new ResponseCursor( Optional.of(filter0Path) ); final Optional<String> cursor = responseCursor.encodeAsString(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java index 1e63df1..a157e47 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java @@ -42,14 +42,10 @@ public class CandidateResults implements Iterable<CandidateResult> { private final List<CandidateResult> candidates; private final Collection<SelectFieldMapping> getFieldMappings; - private final SearchEdge searchEdge; - - public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings, - final SearchEdge searchEdge ) { + public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) { this.candidates = candidates; this.getFieldMappings = getFieldMappings; - this.searchEdge = searchEdge; offset = Optional.absent(); } @@ -91,11 +87,6 @@ public class CandidateResults implements Iterable<CandidateResult> { } - public SearchEdge getSearchEdge() { - return searchEdge; - } - - /** * Get the candidates * @return http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index 2d4696a..5b67060 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -169,7 +169,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { } failureMonitor.success(); - return parseResults(searchResponse, parsedQuery, searchEdge, limit, offset); + return parseResults(searchResponse, parsedQuery, limit, offset); } @@ -227,7 +227,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { /** * Parse the results and return the canddiate results */ - private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final SearchEdge searchEdge, + private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final int limit, final int from ) { final SearchHits searchHits = searchResponse.getHits(); @@ -244,8 +244,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { candidates.add( candidateResult ); } - final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings(), - searchEdge ); + final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings()); // >= seems odd. However if we get an overflow, we need to account for it. if ( hits.length >= limit ) {
