http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java index 84654aa..91773c4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java @@ -23,12 +23,12 @@ package org.apache.usergrid.corepersistence.pipeline.read.collect; import java.util.ArrayList; import java.util.List; +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; -import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; -import org.apache.usergrid.persistence.model.entity.Entity; import com.google.common.base.Optional; @@ -37,21 +37,32 @@ import rx.Observable; /** * Takes entities and collects them into results. This mostly exists for 1.0 compatibility. Eventually this will - * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST. + * become the only collector in our pipeline and be used when rendering results, both on GET, PUT and POST. + * + * + * + * @param T the type of element to be collected */ -public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage> - implements Collector<Entity, ResultsPage> { +public class ResultsPageCollector<T> extends AbstractFilter<FilterResult<T>, ResultsPage<T>> { + + + protected PipelineContext pipelineContext; + + + @Override + public void setContext( final PipelineContext pipelineContext ) { + this.pipelineContext = pipelineContext; + } + @Override - public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) { + public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) { final int limit = pipelineContext.getLimit(); return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect( - () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> { - collector.add( entity ); - } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results, + () -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results, new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) ); } @@ -59,10 +70,10 @@ public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage> /** * A collector that will aggregate our results together */ - private static class ResultsPageWithCursorCollector { + private class ResultsPageWithCursorCollector { - private final List<Entity> results; + private final List<T> results; private Optional<EdgePath> lastPath; @@ -72,7 +83,7 @@ public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage> } - public void add( final FilterResult<Entity> result ) { + public void add( final FilterResult<T> result ) { this.results.add( result.getValue() ); this.lastPath = result.getPath(); }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java deleted file mode 100644 index f403e21..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.metrics.ObservableTimer; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.codahale.metrics.Timer; -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * Command for reading graph edges - */ -public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer> - implements Filter<Id, Candidate> { - - private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class ); - - private final EntityIndexFactory entityIndexFactory; - private final String query; - private final Timer searchTimer; - - - /** - * Create a new instance of our command - */ - public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory, - final MetricsFactory metricsFactory, final String query ) { - this.entityIndexFactory = entityIndexFactory; - this.query = query; - this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" ); - } - - - @Override - public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) { - - //get the graph manager - final ApplicationEntityIndex applicationEntityIndex = - entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() ); - - - final int limit = pipelineContext.getLimit(); - - - final SearchTypes searchTypes = getSearchTypes(); - - - //return all ids that are emitted from this edge - return observable.flatMap( idFilterResult -> { - - final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() ); - - - final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> { - - //our offset to our start value. This will be set the first time we emit - //after we receive new ids, we want to reset this to 0 - //set our our constant state - final Optional<Integer> startFromCursor = getSeekValue(); - - final int startOffset = startFromCursor.or( 0 ); - - int currentOffSet = startOffset; - - subscriber.onStart(); - - //emit while we have values from ES and someone is subscribed - while ( !subscriber.isUnsubscribed() ) { - - - try { - final CandidateResults candidateResults = - applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet ); - - - - for( CandidateResult candidateResult: candidateResults){ - - //our subscriber unsubscribed, break out - if(subscriber.isUnsubscribed()){ - return; - } - - final Candidate candidate = new Candidate( candidateResult, searchEdge ); - - final FilterResult<Candidate> - result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() ); - - subscriber.onNext( result ); - - currentOffSet++; - } - - /** - * No candidates, we're done - */ - if (candidateResults.size() < limit) { - subscriber.onCompleted(); - return; - } - - } - catch ( Throwable t ) { - - log.error( "Unable to search candidates", t ); - subscriber.onError( t ); - } - } - } ); - - - //add a timer around our observable - ObservableTimer.time( candidates, searchTimer ); - - return candidates; - } ); - } - - - @Override - protected CursorSerializer<Integer> getCursorSerializer() { - return ElasticsearchCursorSerializer.INSTANCE; - } - - - /** - * Get the search edge from the id - */ - protected abstract SearchEdge getSearchEdge( final Id id ); - - /** - * Get the search types - */ - protected abstract SearchTypes getSearchTypes(); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java deleted file mode 100644 index ab9d5d9..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.SearchEdge; - - -/** - * Create a candidate. This holds the original candidate, as well as the search scope it was found it - */ -public class Candidate { - - private final CandidateResult candidateResult; - private final SearchEdge searchEdge; - - - /** - * Create a new Candidate for further processing - * @param candidateResult The candidate result - * @param searchEdge The search edge this was searched on - */ - public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) { - this.candidateResult = candidateResult; - this.searchEdge = searchEdge; - } - - - public CandidateResult getCandidateResult() { - return candidateResult; - } - - - public SearchEdge getSearchEdge() { - return searchEdge; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java deleted file mode 100644 index 4304b37..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; -import com.google.common.base.Optional; -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Loads entities from an incoming CandidateResult emissions into entities, then streams them on - * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the - * collector to determine when to stop streaming entities. - */ -public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity> - implements Filter<Candidate, Entity> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final EntityIndexFactory entityIndexFactory; - - - @Inject - public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EntityIndexFactory entityIndexFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.entityIndexFactory = entityIndexFactory; - } - - - @Override - public Observable<FilterResult<Entity>> call( - final Observable<FilterResult<Candidate>> candidateResultsObservable ) { - - - /** - * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results - * objects - */ - - final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - - final ApplicationEntityIndex applicationIndex = - entityIndexFactory.createApplicationEntityIndex( applicationScope ); - - //buffer them to get a page size we can make 1 network hop - final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() ) - - //load them - .flatMap( candidateResults -> { - //flatten toa list of ids to load - final Observable<List<Id>> candidateIds = - Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList(); - - //load the ids - final Observable<EntitySet> entitySetObservable = - candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) ); - - //now we have a collection, validate our canidate set is correct. - - return entitySetObservable.map( - entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet, - candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ) - .flatMap( - entityCollector -> Observable.from( entityCollector.getResults() ) ); - } ); - - //if we filter all our results, we want to continue to try the next page - return searchIdSetObservable; - } - - - - - /** - * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally - */ - private static final class EntityVerifier { - - private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); - private List<FilterResult<Entity>> results = new ArrayList<>(); - - private final EntityIndexBatch batch; - private final List<FilterResult<Candidate>> candidateResults; - private final EntitySet entitySet; - - - public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet, - final List<FilterResult<Candidate>> candidateResults ) { - this.batch = batch; - this.entitySet = entitySet; - this.candidateResults = candidateResults; - this.results = new ArrayList<>( entitySet.size() ); - } - - - /** - * Merge our candidates and our entity set into results - */ - public void merge() { - - for ( final FilterResult<Candidate> candidateResult : candidateResults ) { - validate( candidateResult ); - } - - batch.execute(); - } - - - public List<FilterResult<Entity>> getResults() { - return results; - } - - - public EntityIndexBatch getBatch() { - return batch; - } - - - private void validate( final FilterResult<Candidate> filterResult ) { - - final Candidate candidate = filterResult.getValue(); - final CandidateResult candidateResult = candidate.getCandidateResult(); - final SearchEdge searchEdge = candidate.getSearchEdge(); - final Id candidateId = candidateResult.getId(); - final UUID candidateVersion = candidateResult.getVersion(); - - - final MvccEntity entity = entitySet.getEntity( candidateId ); - - - //doesn't exist warn and drop - if ( entity == null ) { - logger.warn( - "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra." - + " Ignoring since this could be a region sync issue", - candidateId, candidateVersion ); - - - //TODO trigger an audit after a fail count where we explicitly try to repair from other regions - - return; - - } - - - final UUID entityVersion = entity.getVersion(); - final Id entityId = entity.getId(); - - - - - - //entity is newer than ES version, could be an update or the entity is marked as deleted - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) { - - logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", - new Object[] { searchEdge, entityId, entityVersion } ); - batch.deindex( searchEdge, entityId, candidateVersion ); - return; - } - - //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to - //remove the ES record, since the read in cass should cause a read repair, just ignore - if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { - - logger.warn( - "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " - + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); - - //TODO trigger an audit after a fail count where we explicitly try to repair from other regions - - return; - } - - //they're the same add it - - final Entity returnEntity = entity.getEntity().get(); - - final Optional<EdgePath> parent = filterResult.getPath(); - - final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent ); - - results.add( toReturn ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java deleted file mode 100644 index 0e87141..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.VersionSet; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of - * candidate results, output is a stream of validated Ids - */ -public class CandidateIdFilter extends AbstractFilter<Candidate, Id> implements Filter<Candidate, Id> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final EntityIndexFactory entityIndexFactory; - - - @Inject - public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EntityIndexFactory entityIndexFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.entityIndexFactory = entityIndexFactory; - } - - - @Override - public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) { - - - /** - * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results - * objects - */ - - final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - - final ApplicationEntityIndex applicationIndex = - entityIndexFactory.createApplicationEntityIndex( applicationScope ); - - final Observable<FilterResult<Id>> searchIdSetObservable = - filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> { - //flatten toa list of ids to load - final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map( - candidate -> candidate.getValue().getCandidateResult().getId() ).toList(); - - //load the ids - final Observable<VersionSet> versionSetObservable = - candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); - - //now we have a collection, validate our canidate set is correct. - - return versionSetObservable.map( - entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, - candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap( - entityCollector -> Observable.from( entityCollector.collectResults() ) ); - } ); - - return searchIdSetObservable; - } - - - /** - * Map a new cp entity to an old entity. May be null if not present - */ - private static final class EntityCollector { - - private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); - private List<FilterResult<Id>> results = new ArrayList<>(); - - private final EntityIndexBatch batch; - private final List<FilterResult<Candidate>> candidateResults; - private final VersionSet versionSet; - - - public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet, - final List<FilterResult<Candidate>> candidateResults ) { - this.batch = batch; - this.versionSet = versionSet; - this.candidateResults = candidateResults; - this.results = new ArrayList<>( versionSet.size() ); - } - - - /** - * Merge our candidates and our entity set into results - */ - public void merge() { - - for ( final FilterResult<Candidate> candidateResult : candidateResults ) { - validate( candidateResult ); - } - - batch.execute(); - } - - - public List<FilterResult<Id>> collectResults() { - return results; - } - - - /** - * Validate each candidate results vs the data loaded from cass - */ - private void validate( final FilterResult<Candidate> filterCandidate ) { - - final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult(); - - final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge(); - - final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() ); - - final UUID candidateVersion = candidateResult.getVersion(); - - final UUID entityVersion = logEntry.getVersion(); - - final Id entityId = logEntry.getEntityId(); - - //entity is newer than ES version - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) { - - logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", - new Object[] { searchEdge, entityId, entityVersion } ); - batch.deindex( searchEdge, entityId, entityVersion ); - return; - } - - //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to - //remove the ES record, since the read in cass should cause a read repair, just ignore - if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { - - logger.warn( - "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " - + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); - } - - //they're the same add it - - final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() ); - - results.add( result ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java deleted file mode 100644 index 702b2d9..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge; - - -public class ElasticSearchCollectionFilter extends AbstractElasticSearchFilter { - - private final String collectionName; - private final String entityType; - - /** - * Create a new instance of our command - * - * @param entityIndexFactory The entity index factory used to search - * @param metricsFactory The metrics factory for metrics - * @param collectionName The name of the collection - * @param entityType The entity type - */ - @Inject - public ElasticSearchCollectionFilter( final EntityIndexFactory entityIndexFactory, - final MetricsFactory metricsFactory, @Assisted( "query" ) final String query, - @Assisted( "collectionName" ) final String collectionName, - @Assisted( "entityType" ) final String entityType ) { - super( entityIndexFactory, metricsFactory, query ); - this.collectionName = collectionName; - this.entityType = entityType; - } - - - - @Override - protected SearchTypes getSearchTypes() { - final SearchTypes types = SearchTypes.fromTypes( entityType ); - - return types; - } - - - @Override - protected SearchEdge getSearchEdge( final Id incomingId ) { - final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName ); - - return searchEdge; - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java deleted file mode 100644 index cc40530..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge; - - -public class ElasticSearchConnectionFilter extends AbstractElasticSearchFilter { - - - private final String connectionName; - private final Optional<String> connectedEntityType; - - - /** - * Create a new instance of our command - */ - @Inject - public ElasticSearchConnectionFilter( final EntityIndexFactory entityIndexFactory, - final MetricsFactory metricsFactory, @Assisted( "query" ) final String query, - @Assisted( "connectionName" ) final String connectionName, - @Assisted( "connectedEntityType" ) - final Optional<String> connectedEntityType ) { - super( entityIndexFactory, metricsFactory, query ); - - this.connectionName = connectionName; - this.connectedEntityType = connectedEntityType; - } - - - @Override - protected SearchTypes getSearchTypes() { - final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() ); - - return searchTypes; - } - - - @Override - protected SearchEdge getSearchEdge( final Id id ) { - final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName ); - - return searchEdge; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java deleted file mode 100644 index a4e7746..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.graph.impl.SimpleEdge; - - -/** - * ElasticSearch cursor serializer - */ -public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> { - - - public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer(); - - @Override - protected Class<Integer> getType() { - return Integer.class; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg deleted file mode 100644 index 08970e3..0000000 Binary files a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java deleted file mode 100644 index 42b352b..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.graph.SearchByEdge; -import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; - - -/** - * Filter should take and Id and a graph edge, and ensure the connection between the two exists - */ -public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements - Filter<Id, Id> { - - private final GraphManagerFactory graphManagerFactory; - private final Id targetId; - - - @Inject - public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id - targetId ) { - this.graphManagerFactory = graphManagerFactory; - this.targetId = targetId; - } - - - @Override - public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { - - final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); - - return filterValueObservable.flatMap( filterValue -> { - final String edgeTypeName = getEdgeName(); - final Id id = filterValue.getValue(); - - //create our search - final SearchByEdge searchByEdge = - new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - Optional.absent() ); - - //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node - return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath())); - } ); - } - - - /** - * Get the name of the edge to be used in the seek - */ - protected abstract String getEdgeName(); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java deleted file mode 100644 index 303bc5b..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; -import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * Command for reading graph edges - */ -public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> { - - private final GraphManagerFactory graphManagerFactory; - - - /** - * Create a new instance of our command - */ - public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) { - this.graphManagerFactory = graphManagerFactory; - } - - - @Override - public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) { - - - //get the graph manager - final GraphManager graphManager = - graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); - - - final String edgeName = getEdgeTypeName(); - final EdgeState edgeCursorState = new EdgeState(); - - - //return all ids that are emitted from this edge - return previousIds.flatMap( previousFilterValue -> { - - //set our our constant state - final Optional<Edge> startFromCursor = getSeekValue(); - final Id id = previousFilterValue.getValue(); - - - final SimpleSearchByEdgeType search = - new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - startFromCursor ); - - /** - * TODO, pass a message with pointers to our cursor values to be generated later - */ - return graphManager.loadEdgesFromSource( search ) - //set the edge state for cursors - .doOnNext( edge -> edgeCursorState.update( edge ) ) - - //map our id from the target edge and set our cursor every edge we traverse - .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(), - previousFilterValue.getPath() ) ); - } ); - } - - - @Override - protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue, - final Optional<EdgePath> parent ) { - - //if it's our first pass, there's no cursor to generate - if(cursorValue == null){ - return new FilterResult<>( emit, parent ); - } - - return super.createFilterResult( emit, cursorValue, parent ); - } - - - @Override - protected CursorSerializer<Edge> getCursorSerializer() { - return EdgeCursorSerializer.INSTANCE; - } - - - /** - * Get the edge type name we should use when traversing - */ - protected abstract String getEdgeTypeName(); - - - /** - * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We - * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we - * seek to a value = to our last, we may skip data. - */ - private final class EdgeState { - - private Edge cursorEdge = null; - private Edge currentEdge = null; - - - /** - * Update the pointers - */ - private void update( final Edge newEdge ) { - cursorEdge = currentEdge; - currentEdge = newEdge; - } - - - /** - * Get the edge to use in cursors for resume - */ - private Edge getCursorEdge() { - return cursorEdge; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java deleted file mode 100644 index 769a67e..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.graph.impl.SimpleEdge; - - -/** - * Edge cursor serializer - */ -public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> { - - - public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer(); - - @Override - protected Class<SimpleEdge> getType() { - return SimpleEdge.class; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java deleted file mode 100644 index 5a0e026..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; - - -/** - * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should - * be removed - */ -public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> { - - private final Id entityId; - - - @Inject - public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;} - - - - @Override - public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { - //ignore what our input was, and simply emit the id specified - return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() )); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java deleted file mode 100644 index d598a2e..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Loads entities from a set of Ids. - * - * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification - */ -public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - - - @Inject - public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - } - - - @Override - public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) { - - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() ); - - //it's more efficient to make 1 network hop to get everything, then drop our results if required - final Observable<FilterResult<Entity>> entityObservable = - filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> { - - final Observable<EntitySet> entitySetObservable = - Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList() - .flatMap( ids -> entityCollectionManager.load( ids ) ); - - - //now we have a collection, validate our canidate set is correct. - - return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) ) - .doOnNext( entityCollector -> entityCollector.merge() ).flatMap( - entityCollector -> Observable.from( entityCollector.getResults() ) ); - } ); - - return entityObservable; - } - - - /** - * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this - * state is mutable and difficult to represent functionally - */ - private static final class EntityVerifier { - - private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); - private List<FilterResult<Entity>> results = new ArrayList<>(); - - private final List<FilterResult<Id>> candidateResults; - private final EntitySet entitySet; - - - public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) { - this.entitySet = entitySet; - this.candidateResults = candidateResults; - this.results = new ArrayList<>( entitySet.size() ); - } - - - /** - * Merge our candidates and our entity set into results - */ - public void merge() { - - for ( final FilterResult<Id> candidateResult : candidateResults ) { - validate( candidateResult ); - } - } - - - public List<FilterResult<Entity>> getResults() { - return results; - } - - - private void validate( final FilterResult<Id> filterResult ) { - - final Id candidateId = filterResult.getValue(); - - - final MvccEntity entity = entitySet.getEntity( candidateId ); - - - //doesn't exist warn and drop - if ( entity == null || !entity.getEntity().isPresent() ) { - logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra." - + " Ignoring since this could be a region sync issue", candidateId ); - - - //TODO trigger an audit after a fail count where we explicitly try to repair from other regions - - return; - } - - //it exists, add it - - final Entity returnEntity = entity.getEntity().get(); - - final Optional<EdgePath> parent = filterResult.getPath(); - - final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent ); - - results.add( toReturn ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg deleted file mode 100644 index c0308bd..0000000 Binary files a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java deleted file mode 100644 index da6ad29..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - - -/** - * Read an edge in the graph to verify it's existence by id - */ -public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{ - - private final String collectionName; - - @Inject - public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) { - super( graphManagerFactory, targetId ); - this.collectionName = collectionName; - } - - - @Override - protected String getEdgeName() { - return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java deleted file mode 100644 index 91ae7c3..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.persistence.graph.GraphManagerFactory; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName; - - -/** - * Command for reading graph edges on a collection - */ -public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { - - private final String collectionName; - - - /** - * Create a new instance of our command - */ - @Inject - public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) { - super( graphManagerFactory ); - this.collectionName = collectionName; - } - - - @Override - protected String getEdgeTypeName() { - return getEdgeTypeFromCollectionName( collectionName ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java deleted file mode 100644 index 4756d33..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - - -/** - * Read an edge in the graph to verify it's existence by id - */ -public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{ - - private final String connectionName; - - @Inject - public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory, - @Assisted final String connectionName, @Assisted final Id targetId ) { - super( graphManagerFactory, targetId ); - this.connectionName = connectionName; - } - - - @Override - protected String getEdgeName() { - return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java deleted file mode 100644 index 7371579..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; - - -/** - * Command for reading graph edges on a connection - */ -public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> { - - private final GraphManagerFactory graphManagerFactory; - private final String connectionName; - private final String entityType; - - - /** - * Create a new instance of our command - */ - @Inject - public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory, - @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) { - this.graphManagerFactory = graphManagerFactory; - this.connectionName = connectionName; - this.entityType = entityType; - } - - - - @Override - public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) { - - //get the graph manager - final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); - - - - final String edgeName = getEdgeTypeFromConnectionType( connectionName ); - - - //return all ids that are emitted from this edge - return filterResultObservable.flatMap( idFilterResult -> { - - //set our our constant state - final Optional<Edge> startFromCursor = getSeekValue(); - final Id id = idFilterResult.getValue(); - - final SimpleSearchByIdType search = - new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - entityType, startFromCursor ); - - return graphManager.loadEdgesFromSourceByType( search ).map( - edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() )); - } ); - } - - - @Override - protected CursorSerializer<Edge> getCursorSerializer() { - return EdgeCursorSerializer.INSTANCE; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java deleted file mode 100644 index 0d4971b..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.graph; - - -import org.apache.usergrid.persistence.graph.GraphManagerFactory; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; - - -/** - * Command for reading graph edges on a connection - */ -public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { - - private final String connectionName; - - - /** - * Create a new instance of our command - */ - @Inject - public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) { - super( graphManagerFactory ); - this.connectionName = connectionName; - } - - - @Override - protected String getEdgeTypeName() { - return getEdgeTypeFromConnectionType( connectionName ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java new file mode 100644 index 0000000..eaf74c1 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * Command for reading graph edges + */ +public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer> { + + private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class ); + + private final EntityIndexFactory entityIndexFactory; + private final String query; + private final Timer searchTimer; + + + /** + * Create a new instance of our command + */ + public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory, + final MetricsFactory metricsFactory, final String query ) { + this.entityIndexFactory = entityIndexFactory; + this.query = query; + this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" ); + } + + + @Override + public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) { + + //get the graph manager + final ApplicationEntityIndex applicationEntityIndex = + entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() ); + + + final int limit = pipelineContext.getLimit(); + + + final SearchTypes searchTypes = getSearchTypes(); + + + //return all ids that are emitted from this edge + return observable.flatMap( idFilterResult -> { + + final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() ); + + + final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> { + + //our offset to our start value. This will be set the first time we emit + //after we receive new ids, we want to reset this to 0 + //set our our constant state + final Optional<Integer> startFromCursor = getSeekValue(); + + final int startOffset = startFromCursor.or( 0 ); + + int currentOffSet = startOffset; + + subscriber.onStart(); + + //emit while we have values from ES and someone is subscribed + while ( !subscriber.isUnsubscribed() ) { + + + try { + final CandidateResults candidateResults = + applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet ); + + + + for( CandidateResult candidateResult: candidateResults){ + + //our subscriber unsubscribed, break out + if(subscriber.isUnsubscribed()){ + return; + } + + final Candidate candidate = new Candidate( candidateResult, searchEdge ); + + final FilterResult<Candidate> + result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() ); + + subscriber.onNext( result ); + + currentOffSet++; + } + + /** + * No candidates, we're done + */ + if (candidateResults.size() < limit) { + subscriber.onCompleted(); + return; + } + + } + catch ( Throwable t ) { + + log.error( "Unable to search candidates", t ); + subscriber.onError( t ); + } + } + } ); + + + //add a timer around our observable + ObservableTimer.time( candidates, searchTimer ); + + return candidates; + } ); + } + + + @Override + protected CursorSerializer<Integer> getCursorSerializer() { + return ElasticsearchCursorSerializer.INSTANCE; + } + + + /** + * Get the search edge from the id + */ + protected abstract SearchEdge getSearchEdge( final Id id ); + + /** + * Get the search types + */ + protected abstract SearchTypes getSearchTypes(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java new file mode 100644 index 0000000..7ada4ba --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.SearchEdge; + + +/** + * Create a candidate. This holds the original candidate, as well as the search scope it was found it + */ +public class Candidate { + + private final CandidateResult candidateResult; + private final SearchEdge searchEdge; + + + /** + * Create a new Candidate for further processing + * @param candidateResult The candidate result + * @param searchEdge The search edge this was searched on + */ + public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) { + this.candidateResult = candidateResult; + this.searchEdge = searchEdge; + } + + + public CandidateResult getCandidateResult() { + return candidateResult; + } + + + public SearchEdge getSearchEdge() { + return searchEdge; + } +}