http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java new file mode 100644 index 0000000..56e1c1c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.VersionSet; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Responsible for verifying candidate result versions, then emitting the Ids of these versions + * Input is a batch of candidate results, output is a stream of validated Ids + */ +public class CandidateIdFilter extends AbstractFilter<Candidate, Id> + implements Filter<Candidate, Id> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; + + + @Inject + public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; + } + + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + final ApplicationEntityIndex applicationIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( + candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = + Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() ) + .toList(); + + //load the ids + final Observable<VersionSet> versionSetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); + + //now we have a collection, validate our canidate set is correct. + + return versionSetObservable.map( + entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) ) + .doOnNext( entityCollector -> entityCollector.merge() ).flatMap( + entityCollector -> Observable.from( entityCollector.collectResults() ) ); + } ); + + return searchIdSetObservable; + } + + + + + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + private static final class EntityCollector { + + private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); + private List<FilterResult<Id>> results = new ArrayList<>(); + + private final EntityIndexBatch batch; + private final List<FilterResult<Candidate>> candidateResults; + private final VersionSet versionSet; + + + public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet, + final List<FilterResult<Candidate>> candidateResults ) { + this.batch = batch; + this.versionSet = versionSet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( versionSet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final FilterResult<Candidate> candidateResult : candidateResults ) { + validate( candidateResult ); + } + + batch.execute(); + } + + + public List<FilterResult<Id>> collectResults() { + return results; + } + + + /** + * Validate each candidate results vs the data loaded from cass + * @param filterCandidate + */ + private void validate( final FilterResult<Candidate> filterCandidate ) { + + final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult(); + + final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge(); + + final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() ); + + final UUID candidateVersion = candidateResult.getVersion(); + + final UUID entityVersion = logEntry.getVersion(); + + final Id entityId = logEntry.getEntityId(); + + //entity is newer than ES version + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) { + + logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + new Object[] { searchEdge, entityId, entityVersion } ); + batch.deindex( searchEdge, entityId, entityVersion ); + return; + } + + //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to + //remove the ES record, since the read in cass should cause a read repair, just ignore + if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { + + logger.warn( + "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " + + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); + } + + //they're the same add it + + final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() ); + + results.add( result ); + } + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java deleted file mode 100644 index 465ff22..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; -import org.apache.usergrid.corepersistence.pipeline.read.Collector; -import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Loads entities from an incoming CandidateResults object and return them as results - */ -public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage> - implements Collector<CandidateResults, ResultsPage> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final EntityIndexFactory entityIndexFactory; - - - @Inject - public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EntityIndexFactory entityIndexFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.entityIndexFactory = entityIndexFactory; - } - - - @Override - public Observable<ResultsPage> call( final Observable<CandidateResults> candidateResultsObservable ) { - - - /** - * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results - * objects - */ - - final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - - final ApplicationEntityIndex applicationIndex = - entityIndexFactory.createApplicationEntityIndex( applicationScope ); - - final Observable<ResultsPage> searchIdSetObservable = candidateResultsObservable.flatMap( candidateResults -> { - //flatten toa list of ids to load - final Observable<List<Id>> candidateIds = - Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList(); - - //load the ids - final Observable<EntitySet> entitySetObservable = - candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) ); - - //now we have a collection, validate our canidate set is correct. - - return entitySetObservable - .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) ) - .doOnNext( entityCollector -> entityCollector.merge() ) - .map( entityCollector -> entityCollector.getResults() ); - } ); - - //if we filter all our results, we want to continue to try the next page - return searchIdSetObservable; - } - - - /** - * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally - */ - private static final class EntityCollector { - - private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); - private List<Entity> results = new ArrayList<>(); - - private final EntityIndexBatch batch; - private final CandidateResults candidateResults; - private final EntitySet entitySet; - - - public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet, - final CandidateResults candidateResults ) { - this.batch = batch; - this.entitySet = entitySet; - this.candidateResults = candidateResults; - this.results = new ArrayList<>( entitySet.size() ); - } - - - /** - * Merge our candidates and our entity set into results - */ - public void merge() { - - for ( final CandidateResult candidateResult : candidateResults ) { - validate( candidateResult ); - } - - batch.execute(); - } - - - public ResultsPage getResults() { - return new ResultsPage( results ); - } - - - public EntityIndexBatch getBatch() { - return batch; - } - - - private void validate( final CandidateResult candidateResult ) { - - final Id candidateId = candidateResult.getId(); - final UUID candidateVersion = candidateResult.getVersion(); - - - final MvccEntity entity = entitySet.getEntity( candidateId ); - - - //doesn't exist warn and drop - if ( entity == null ) { - logger.warn( - "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra." - + " Ignoring since this could be a region sync issue", - candidateId, candidateVersion ); - - - //TODO trigger an audit after a fail count where we explicitly try to repair from other regions - - return; - - } - - - final UUID entityVersion = entity.getVersion(); - - - //entity is newer than ES version, could be an update or the entity is marked as deleted - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) { - - final Id entityId = entity.getId(); - final SearchEdge searchEdge = candidateResults.getSearchEdge(); - - logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", - new Object[] { searchEdge, entityId, entityVersion } ); - batch.deindex( searchEdge, entityId, entityVersion ); - return; - } - - //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to - //remove the ES record, since the read in cass should cause a read repair, just ignore - if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { - - final Id entityId = entity.getId(); - final SearchEdge searchEdge = candidateResults.getSearchEdge(); - - logger.warn( - "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " - + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); - - //TODO trigger an audit after a fail count where we explicitly try to repair from other regions - - return; - } - - //they're the same add it - - - results.add( entity.getEntity().get() ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java deleted file mode 100644 index bb9ab76..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; -import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.VersionSet; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Responsible for verifying candidate result versions, then emitting the Ids of these versions - * Input is a batch of candidate results, output is a stream of validated Ids - */ -public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id> - implements PipelineOperation<CandidateResults, Id> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final EntityIndexFactory entityIndexFactory; - - - @Inject - public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EntityIndexFactory entityIndexFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.entityIndexFactory = entityIndexFactory; - } - - - - @Override - public Observable<Id> call( final Observable<CandidateResults> observable ) { - - - /** - * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results - * objects - */ - - final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - - final ApplicationEntityIndex applicationIndex = - entityIndexFactory.createApplicationEntityIndex( applicationScope ); - - final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> { - //flatten toa list of ids to load - final Observable<List<Id>> candidateIds = - Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList(); - - //load the ids - final Observable<VersionSet> versionSetObservable = - candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); - - //now we have a collection, validate our canidate set is correct. - - return versionSetObservable - .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) ) - .doOnNext( entityCollector -> entityCollector.merge() ) - .flatMap( entityCollector -> Observable.from( entityCollector.collectResults() ) ); - } ); - - return searchIdSetObservable; - } - - - /** - * Map a new cp entity to an old entity. May be null if not present - */ - private static final class EntityCollector { - - private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); - private List<Id> results = new ArrayList<>(); - - private final EntityIndexBatch batch; - private final CandidateResults candidateResults; - private final VersionSet versionSet; - - - public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet, - final CandidateResults candidateResults ) { - this.batch = batch; - this.versionSet = versionSet; - this.candidateResults = candidateResults; - this.results = new ArrayList<>( versionSet.size() ); - } - - - /** - * Merge our candidates and our entity set into results - */ - public void merge() { - - for ( final CandidateResult candidateResult : candidateResults ) { - validate( candidateResult ); - } - - batch.execute(); - } - - - public List<Id> collectResults() { - return results; - } - - - /** - * Validate each candidate results vs the data loaded from cass - * @param candidateResult - */ - private void validate( final CandidateResult candidateResult ) { - - final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() ); - - final UUID candidateVersion = candidateResult.getVersion(); - - final UUID entityVersion = logEntry.getVersion(); - - final Id entityId = logEntry.getEntityId(); - - //entity is newer than ES version - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) { - - final SearchEdge searchEdge = candidateResults.getSearchEdge(); - - logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", - new Object[] { searchEdge, entityId, entityVersion } ); - batch.deindex( searchEdge, entityId, entityVersion ); - return; - } - - //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to - //remove the ES record, since the read in cass should cause a read repair, just ignore - if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { - - final SearchEdge searchEdge = candidateResults.getSearchEdge(); - - logger.warn( - "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " - + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); - } - - //they're the same add it - - results.add( entityId ); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java deleted file mode 100644 index cc96633..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.SimpleEntityRef; -import org.apache.usergrid.persistence.model.entity.Id; - - -public class CollectionRefsVerifier extends VersionVerifier { - - - - @Override - public Results getResults( final Collection<Id> ids ) { - List<EntityRef> refs = new ArrayList<EntityRef>(ids.size()); - for ( Id id : ids ) { - refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) ); - } - return Results.fromRefList( refs ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java deleted file mode 100644 index 94c91d9..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.SearchEdge; - - -/** - * Factory for creating results - */ -public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory { - - private final EntityCollectionManager entityCollectionManager; - private final ApplicationEntityIndex applicationEntityIndex; - - - public CollectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager, - final ApplicationEntityIndex applicationEntityIndex ) { - this.entityCollectionManager = entityCollectionManager; - this.applicationEntityIndex = applicationEntityIndex; - } - - - @Override - public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope, - final Query.Level resultsLevel ) { - - ResultsVerifier verifier; - - if ( resultsLevel == Query.Level.REFS ) { - verifier = new CollectionRefsVerifier(); - } - else if ( resultsLevel == Query.Level.IDS ) { - verifier = new IdsVerifier(); - } - else { - verifier = new EntityVerifier( Query.MAX_LIMIT ); - } - - return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, - scope ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java deleted file mode 100644 index 6b7bdde..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.usergrid.persistence.ConnectionRef; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; -import org.apache.usergrid.persistence.model.entity.Id; - -import static org.apache.usergrid.persistence.SimpleEntityRef.ref; - - -/** - * Verifier for creating connections - */ -public class ConnectionRefsVerifier extends VersionVerifier { - - - private final EntityRef ownerId; - private final String connectionType; - - - public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) { - this.ownerId = ownerId; - this.connectionType = connectionType; - } - - @Override - public Results getResults( final Collection<Id> ids ) { - List<ConnectionRef> refs = new ArrayList<>(); - for ( Id id : ids ) { - refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid()) )); - } - - return Results.fromConnections( refs ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java deleted file mode 100644 index 55b95a9..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; - - -/** - * Factory for creating results - */ -public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory { - - private final EntityCollectionManager entityCollectionManager; - private final ApplicationEntityIndex applicationEntityIndex; - private final EntityRef ownerId; - private final String connectionType; - - - public ConnectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager, - final ApplicationEntityIndex applicationEntityIndex, final EntityRef ownerId, - final String connectionType ) { - this.entityCollectionManager = entityCollectionManager; - this.applicationEntityIndex = applicationEntityIndex; - this.ownerId = ownerId; - this.connectionType = connectionType; - } - - - @Override - public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope, - final Query.Level resultsLevel ) { - - ResultsVerifier verifier; - - if ( resultsLevel == Query.Level.REFS ) { - verifier = new ConnectionRefsVerifier( ownerId, connectionType ); - } - else if ( resultsLevel == Query.Level.IDS ) { - verifier = new ConnectionRefsVerifier( ownerId, connectionType ); - ; - } - else { - verifier = new EntityVerifier( Query.MAX_LIMIT ); - } - - return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, scope ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java deleted file mode 100644 index 6e170f8..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.apache.usergrid.persistence.index.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - -import com.google.common.base.Optional; - - -public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> { - - private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class ); - - private final ResultsLoaderFactory resultsLoaderFactory; - - private final ApplicationScope applicationScope; - - private final ApplicationEntityIndex entityIndex; - - private final SearchEdge indexScope; - - private final SearchTypes types; - - private final String query; - - private final Optional<Integer> setOffsetFromCursor; - - private final int limit; - - private int offset; - - - private Results currentResults; - - private boolean moreToLoad = true; - - - - - public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex, - final ApplicationScope applicationScope, final SearchEdge indexScope, - final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) { - this.resultsLoaderFactory = resultsLoaderFactory; - this.applicationScope = applicationScope; - this.entityIndex = entityIndex; - this.indexScope = indexScope; - this.types = types; - this.setOffsetFromCursor = setOffsetFromCursor; - - //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant - //once we start subscribing to streams. - this.query = query; - this.limit = limit; - } - - - @Override - public Iterator<Results> iterator() { - return this; - } - - - private void loadNextPage() { - // Because of possible stale entities, which are filtered out by buildResults(), - // we loop until the we've got enough results to satisfy the query limit. - - final int maxQueries = 10; // max re-queries to satisfy query limit - - - Results results = null; - int queryCount = 0; - - - CandidateResults crs = null; - - int newLimit = limit; - - while ( queryCount++ < maxQueries ) { - - crs = entityIndex.search( indexScope, types, query, newLimit , offset); - - - logger.debug( "Calling build results with crs {}", crs ); - results = buildResults( indexScope, crs ); - - /** - * In an edge case where we delete stale entities, we could potentially get less results than expected. - * This will only occur once during the repair phase. - * We need to ensure that we short circuit before we overflow the requested limit during a repair. - */ - if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more - break; - } - - - //we didn't load anything, but there was a cursor, this means a read repair occured. We have to short - //circuit to avoid over returning the result set - - - // need to query for more - // ask for just what we need to satisfy, don't want to exceed limit - newLimit = newLimit - results.size(); - - logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] { - limit, newLimit, queryCount - } ); - } - - //now set our cursor if we have one for the next iteration - if ( results.hasCursor() ) { - moreToLoad = true; - } - - else { - moreToLoad = false; - } - -// -// //set our select subjects into our query if provided -// if(crs != null){ -// query.setSelectSubjects( crs.getGetFieldMappings() ); -// } -// - - //set our current results and the flag - this.currentResults = results; - } - - - - /** - * Build results from a set of candidates, and discard those that represent stale indexes. - * - * @param indexScope The index scope to execute the search on - * @param crs Candidates to be considered for results - */ - private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) { - - logger.debug( "buildResults() from {} candidates", crs.size() ); - - //get an instance of our results loader - final ResultsLoader resultsLoader = - this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES ); - - //load the results - final Results results = resultsLoader.loadResults(crs); - - //signal for post processing - resultsLoader.postProcess(); - - //set offset into query - - logger.debug( "Returning results size {}", results.size() ); - - return results; - } - - - @Override - public boolean hasNext() { - - //we've tried to load and it's empty and we have more to load, load the next page - if ( currentResults == null ) { - //there's nothing left to load, nothing to do - if ( !moreToLoad ) { - return false; - } - - //load the page - - loadNextPage(); - } - - - //see if our current results are not null - return currentResults != null; - } - - - @Override - public Results next() { - if ( !hasNext() ) { - throw new NoSuchElementException( "No more results present" ); - } - - final Results toReturn = currentResults; - - currentResults = null; - - return toReturn; - } - - @Override - public void remove() { - throw new RuntimeException("Remove not implemented!!"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java deleted file mode 100644 index d73c731..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityFactory; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; -import com.google.common.base.Optional; - - -/** - * A loader that verifies versions are correct in cassandra and match elasticsearch - */ -public class EntityVerifier implements ResultsVerifier { - - private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); - - private EntitySet ids; - - private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping; - - - public EntityVerifier( final int maxSize ) { - this.entityMapping = new HashMap<>( maxSize ); - } - - - @Override - public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) { - ids = ecm.load( idsToLoad ).toBlocking().last(); - logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size()); - } - - - @Override - public boolean isValid( final CandidateResult candidateResult ) { - final Id entityId = candidateResult.getId(); - - final MvccEntity savedEntity = ids.getEntity( entityId ); - - //version wasn't found deindex - if ( savedEntity == null ) { - logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() ); - return false; - } - - final UUID candidateVersion = candidateResult.getVersion(); - final UUID savedVersion = savedEntity.getVersion(); - - if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) { - logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { - entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity - } ); - - return false; - } - - - final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity(); - - if ( !entity.isPresent() ) { - logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ", - entityId.getUuid(), savedEntity.getEntity() ); - return false; - } - - entityMapping.put( entityId, entity.get() ); - - return true; - } - - - @Override - public Results getResults( final Collection<Id> ids ) { - - final List<Entity> ugEntities = new ArrayList<>( ids.size() ); - - for ( final Id id : ids ) { - final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id ); - - Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() ); - - Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); - entity.addProperties( entityMap ); - ugEntities.add( entity ); - } - - return Results.fromEntities( ugEntities ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java deleted file mode 100644 index ade64a2..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.ManagerCache; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; - - -public class FilteringLoader implements ResultsLoader { - - private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class ); - - private final EntityCollectionManager entityCollectionManager; - private final ResultsVerifier resultsVerifier; - private final ApplicationScope applicationScope; - private final SearchEdge indexScope; - private final EntityIndexBatch indexBatch; - - - /** - * Create an instance of a filter loader - * - * @param entityCollectionManager The entityCollectionManagerFactory - * @param resultsVerifier The verifier to verify the candidate results - * @param applicationScope The application scope to perform the load - * @param indexScope The index scope used in the search - */ - protected FilteringLoader( final EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex, final ResultsVerifier resultsVerifier, - final ApplicationScope applicationScope, final SearchEdge indexScope ) { - - this.entityCollectionManager = entityCollectionManager; - this.resultsVerifier = resultsVerifier; - this.applicationScope = applicationScope; - this.indexScope = indexScope; - - indexBatch = applicationEntityIndex.createBatch(); - } - - - @Override - public Results loadResults( final CandidateResults crs ) { - - - if ( crs.size() == 0 ) { - return new Results(); - } - - - // For each entity, holds the index it appears in our candidates for keeping ordering correct - final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() ); - - // Maps the entity ids to our candidates - final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() ); - - - final Iterator<CandidateResult> iter = crs.iterator(); - - - // TODO, in this case we're "optimizing" due to the limitations of collection scope. - // Perhaps we should change the API to just be an application, then an "owner" scope? - - // Go through the candidates and group them by scope for more efficient retrieval. - // Also remove duplicates before we even make a network call - for ( int i = 0; iter.hasNext(); i++ ) { - - final CandidateResult currentCandidate = iter.next(); - - final Id entityId = currentCandidate.getId(); - - //check if we've seen this candidate by id - final CandidateResult previousMax = maxCandidateMapping.get( entityId ); - - //its not been seen, save it - if ( previousMax == null ) { - maxCandidateMapping.put( entityId, currentCandidate ); - orderIndex.put( entityId, i ); - continue; - } - - //we have seen it, compare them - - final UUID previousMaxVersion = previousMax.getVersion(); - - final UUID currentVersion = currentCandidate.getVersion(); - - - final CandidateResult toRemove; - final CandidateResult toKeep; - - //current is newer than previous. Remove previous and keep current - if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) { - toRemove = previousMax; - toKeep = currentCandidate; - } - //previously seen value is newer than current. Remove the current and keep the previously seen value - else { - toRemove = currentCandidate; - toKeep = previousMax; - } - - //this is a newer version, we know we already have a stale entity, add it to be cleaned up - - - //de-index it - logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { - entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion() - } ); - - //deindex this document, and remove the previous maxVersion - //we have to deindex this from our ownerId, since this is what gave us the reference - indexBatch.deindex( indexScope, toRemove ); - - - //TODO, fire the entity repair cleanup task here instead of de-indexing - - //replace the value with a more current version - maxCandidateMapping.put( entityId, toKeep ); - orderIndex.put( entityId, i ); - } - - - //now everything is ordered, and older versions are removed. Batch fetch versions to verify - // existence and correct versions - - final TreeMap<Integer, Id> sortedResults = new TreeMap<>(); - - - final Collection<Id> idsToLoad = - Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() { - @Nullable - @Override - public Id apply( @Nullable final CandidateResult input ) { - //NOTE this is never null, we won't need to check - return input.getId(); - } - } ); - - - //now using the scope, load the collection - - - - //load the results into the loader for this scope for validation - resultsVerifier.loadResults( idsToLoad, entityCollectionManager ); - - //now let the loader validate each candidate. For instance, the "max" in this candidate - //could still be a stale result, so it needs validated - for ( final Id requestedId : idsToLoad ) { - - final CandidateResult cr = maxCandidateMapping.get( requestedId ); - - //ask the loader if this is valid, if not discard it and de-index it - if ( !resultsVerifier.isValid( cr ) ) { - indexBatch.deindex( indexScope, cr ); - continue; - } - - //if we get here we're good, we need to add this to our results - final int candidateIndex = orderIndex.get( requestedId ); - - sortedResults.put( candidateIndex, requestedId ); - } - - - // NOTE DO NOT execute the batch here. - // It changes the results and we need consistent paging until we aggregate all results - return resultsVerifier.getResults( sortedResults.values() ); - } - - - @Override - public void postProcess() { - this.indexBatch.execute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java deleted file mode 100644 index 4a3bfcd..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; - -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.model.entity.Id; - - -public class IdsVerifier extends VersionVerifier { - - @Override - public Results getResults( final Collection<Id> ids ) { - - final List<UUID> returnIds = new ArrayList<>( ids.size() ); - - for ( final Id id : ids ) { - returnIds.add( id.getUuid() ); - } - - - return Results.fromIdList( returnIds ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java deleted file mode 100644 index c2a3e9a..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.index.CandidateResults; - - -/** - * Interface for loading results - */ -public interface ResultsLoader { - - /** - * Using the candidate results, load our results. Should filter stale results - * @param crs The candidate result set - * @return Results. Null safe, but may be empty - */ - public Results loadResults( final CandidateResults crs); - - /** - * Post process the load operation - */ - public void postProcess(); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java deleted file mode 100644 index 3ccca1b..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.SearchEdge; - - -/** - * Factory for creating results - */ -public interface ResultsLoaderFactory { - - /** - * Get the loader for results - * @param applicationScope The application scope used to load results - * @param indexScope The index scope used in the search - * @param - */ - ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope, - final Query.Level resultsLevel ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java deleted file mode 100644 index fe72ca2..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.Collection; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.model.entity.Id; - - -public interface ResultsVerifier { - - /** - * Load all the candidate ides for verification - * @param ids The Id's to load - * @param ecm The entity collection manager - */ - public void loadResults(Collection<Id> ids, EntityCollectionManager ecm); - - /** - * Return true if the candidate result is a valid result that should be retained. If it should - * not it should also be removed from the list of possible return values in this loader - * @param candidateResult - */ - public boolean isValid(CandidateResult candidateResult); - - - /** - * Load the result set with the given ids - * @return - */ - public Results getResults(Collection<Id> ids); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java deleted file mode 100644 index c49fb28..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; - - -import java.util.Collection; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.VersionSet; -import org.apache.usergrid.persistence.index.CandidateResult; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.fasterxml.uuid.UUIDComparator; - - -/** - * A loader that verifies versions are correct in Cassandra and match ElasticSearch - */ -public abstract class VersionVerifier implements ResultsVerifier { - - private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class ); - - private VersionSet ids; - - - @Override - public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) { - ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last(); - } - - - @Override - public boolean isValid( final CandidateResult candidateResult ) { - final Id entityId = candidateResult.getId(); - - final MvccLogEntry version = ids.getMaxVersion( entityId ); - - //version wasn't found ,deindex - if ( version == null ) { - logger.warn( "Version for Entity {}:{} not found", - entityId.getUuid(), entityId.getUuid() ); - - return false; - } - - final UUID savedVersion = version.getVersion(); - - if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) { - logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", - new Object[] { - entityId.getUuid(), - entityId.getType(), - candidateResult.getVersion(), - savedVersion - } ); - - return false; - } - - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java deleted file mode 100644 index 6230147..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.entity; - - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; - - -/** - * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should - * be removed - */ -public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> { - - private final Id entityId; - - - @Inject - public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;} - - - - - @Override - public Observable<Id> call( final Observable<Id> idObservable ) { - return Observable.just( entityId ); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java deleted file mode 100644 index dd6b9b8..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.entity; - - -import java.util.List; - -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; -import org.apache.usergrid.corepersistence.pipeline.read.Collector; -import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Loads entities from a set of Ids. - * - * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification - */ -public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage> - implements Collector<Id, ResultsPage> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - - - @Inject - public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - } - - - @Override - public Observable<ResultsPage> call( final Observable<Id> observable ) { - - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() ); - - final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap( - bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); - - - final Observable<ResultsPage> resultsObservable = entitySetObservable - - .flatMap( entitySet -> { - - //get our entites and filter missing ones, then collect them into a results object - final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); - - - //convert them to our old entity model, then filter abscent, meaning they weren't found - final Observable<List<Entity>> entitiesPageObservable = - mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() ) - .map( mvccEntity -> mvccEntity.getEntity().get() ).toList(); - - //convert them to a list, then map them into results - return entitiesPageObservable.map( entities -> new ResultsPage( entities ) ); - } ); - - - return resultsObservable; - } - - /** - * Map a new cp entity to an old entity. May be null if not present - */ - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java index e0f69cf..42b352b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java @@ -20,8 +20,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdge; @@ -39,7 +40,7 @@ import rx.Observable; /** * Filter should take and Id and a graph edge, and ensure the connection between the two exists */ -public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements +public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> { private final GraphManagerFactory graphManagerFactory; @@ -55,12 +56,13 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp @Override - public Observable<Id> call( final Observable<Id> idObservable ) { + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); - return idObservable.flatMap( id -> { + return filterValueObservable.flatMap( filterValue -> { final String edgeTypeName = getEdgeName(); + final Id id = filterValue.getValue(); //create our search final SearchByEdge searchByEdge = @@ -68,7 +70,7 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp Optional.absent() ); //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node - return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ); + return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath())); } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java index 4dd34fc..503fcf9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java @@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -38,7 +39,7 @@ import rx.Observable; /** * Command for reading graph edges */ -public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> { +public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> { private final GraphManagerFactory graphManagerFactory; @@ -52,7 +53,8 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, @Override - public Observable<Id> call( final Observable<Id> observable ) { + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) { + //get the graph manager final GraphManager graphManager = @@ -63,10 +65,11 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, //return all ids that are emitted from this edge - return observable.flatMap( id -> { + return previousIds.flatMap( previousFilterValue -> { //set our our constant state final Optional<Edge> startFromCursor = getSeekValue(); + final Id id = previousFilterValue.getValue(); final SimpleSearchByEdgeType search = @@ -78,9 +81,9 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, */ return graphManager.loadEdgesFromSource( search ) //set our cursor every edge we traverse - .doOnNext( edge -> setCursor( edge ) ) + //map our id from the target edge - .map( edge -> edge.getTargetNode() ); + .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) ); } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java new file mode 100644 index 0000000..5a0e026 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + + +/** + * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should + * be removed + */ +public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> { + + private final Id entityId; + + + @Inject + public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;} + + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { + //ignore what our input was, and simply emit the id specified + return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() )); + + } +}
