Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-641 3a1784f04 -> 6d54dffc4
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java new file mode 100644 index 0000000..d0b6af9 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Optional; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from an incoming CandidateResult emissions into entities, then streams them on + * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the + * collector to determine when to stop streaming entities. + */ +public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Entity>> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; + + + @Inject + public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; + } + + + @Override + public Observable<FilterResult<Entity>> call( + final Observable<FilterResult<Candidate>> candidateResultsObservable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + final ApplicationEntityIndex applicationIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + //buffer them to get a page size we can make 1 network hop + final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() ) + + //load them + .flatMap( candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = + Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList(); + + //load the ids + final Observable<EntitySet> entitySetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) ); + + //now we have a collection, validate our canidate set is correct. + + return entitySetObservable.map( + entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet, + candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ) + .flatMap( + entityCollector -> Observable.from( entityCollector.getResults() ) ); + } ); + + //if we filter all our results, we want to continue to try the next page + return searchIdSetObservable; + } + + + + + /** + * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally + */ + private static final class EntityVerifier { + + private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); + private List<FilterResult<Entity>> results = new ArrayList<>(); + + private final EntityIndexBatch batch; + private final List<FilterResult<Candidate>> candidateResults; + private final EntitySet entitySet; + + + public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet, + final List<FilterResult<Candidate>> candidateResults ) { + this.batch = batch; + this.entitySet = entitySet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( entitySet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final FilterResult<Candidate> candidateResult : candidateResults ) { + validate( candidateResult ); + } + + batch.execute(); + } + + + public List<FilterResult<Entity>> getResults() { + return results; + } + + + public EntityIndexBatch getBatch() { + return batch; + } + + + private void validate( final FilterResult<Candidate> filterResult ) { + + final Candidate candidate = filterResult.getValue(); + final CandidateResult candidateResult = candidate.getCandidateResult(); + final SearchEdge searchEdge = candidate.getSearchEdge(); + final Id candidateId = candidateResult.getId(); + final UUID candidateVersion = candidateResult.getVersion(); + + + final MvccEntity entity = entitySet.getEntity( candidateId ); + + + //doesn't exist warn and drop + if ( entity == null ) { + logger.warn( + "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra." + + " Ignoring since this could be a region sync issue", + candidateId, candidateVersion ); + + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + + } + + + final UUID entityVersion = entity.getVersion(); + final Id entityId = entity.getId(); + + + + + + //entity is newer than ES version, could be an update or the entity is marked as deleted + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) { + + logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + new Object[] { searchEdge, entityId, entityVersion } ); + batch.deindex( searchEdge, entityId, candidateVersion ); + return; + } + + //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to + //remove the ES record, since the read in cass should cause a read repair, just ignore + if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { + + logger.warn( + "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " + + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + } + + //they're the same add it + + final Entity returnEntity = entity.getEntity().get(); + + final Optional<EdgePath> parent = filterResult.getPath(); + + final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent ); + + results.add( toReturn ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java new file mode 100644 index 0000000..1ef358a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.VersionSet; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of + * candidate results, output is a stream of validated Ids + */ +public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Id>> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; + + + @Inject + public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; + } + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + final ApplicationEntityIndex applicationIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + final Observable<FilterResult<Id>> searchIdSetObservable = + filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map( + candidate -> candidate.getValue().getCandidateResult().getId() ).toList(); + + //load the ids + final Observable<VersionSet> versionSetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); + + //now we have a collection, validate our canidate set is correct. + + return versionSetObservable.map( + entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, + candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap( + entityCollector -> Observable.from( entityCollector.collectResults() ) ); + } ); + + return searchIdSetObservable; + } + + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + private static final class EntityCollector { + + private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); + private List<FilterResult<Id>> results = new ArrayList<>(); + + private final EntityIndexBatch batch; + private final List<FilterResult<Candidate>> candidateResults; + private final VersionSet versionSet; + + + public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet, + final List<FilterResult<Candidate>> candidateResults ) { + this.batch = batch; + this.versionSet = versionSet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( versionSet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final FilterResult<Candidate> candidateResult : candidateResults ) { + validate( candidateResult ); + } + + batch.execute(); + } + + + public List<FilterResult<Id>> collectResults() { + return results; + } + + + /** + * Validate each candidate results vs the data loaded from cass + */ + private void validate( final FilterResult<Candidate> filterCandidate ) { + + final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult(); + + final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge(); + + final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() ); + + final UUID candidateVersion = candidateResult.getVersion(); + + final UUID entityVersion = logEntry.getVersion(); + + final Id entityId = logEntry.getEntityId(); + + //entity is newer than ES version + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) { + + logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + new Object[] { searchEdge, entityId, entityVersion } ); + batch.deindex( searchEdge, entityId, entityVersion ); + return; + } + + //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to + //remove the ES record, since the read in cass should cause a read repair, just ignore + if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { + + logger.warn( + "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " + + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); + } + + //they're the same add it + + final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() ); + + results.add( result ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java new file mode 100644 index 0000000..7cf5a78 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; + + +/** + * ElasticSearch cursor serializer + */ +public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> { + + + public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer(); + + @Override + protected Class<Integer> getType() { + return Integer.class; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg new file mode 100644 index 0000000..08970e3 Binary files /dev/null and b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg differ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java new file mode 100644 index 0000000..a6edd56 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge; + + +public class SearchCollectionFilter extends AbstractElasticSearchFilter { + + private final String collectionName; + private final String entityType; + + /** + * Create a new instance of our command + * + * @param entityIndexFactory The entity index factory used to search + * @param metricsFactory The metrics factory for metrics + * @param collectionName The name of the collection + * @param entityType The entity type + */ + @Inject + public SearchCollectionFilter( final EntityIndexFactory entityIndexFactory, final MetricsFactory metricsFactory, + @Assisted( "query" ) final String query, + @Assisted( "collectionName" ) final String collectionName, + @Assisted( "entityType" ) final String entityType ) { + super( entityIndexFactory, metricsFactory, query ); + this.collectionName = collectionName; + this.entityType = entityType; + } + + + + @Override + protected SearchTypes getSearchTypes() { + final SearchTypes types = SearchTypes.fromTypes( entityType ); + + return types; + } + + + @Override + protected SearchEdge getSearchEdge( final Id incomingId ) { + final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName ); + + return searchEdge; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java new file mode 100644 index 0000000..82d7377 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.search; + + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge; + + +public class SearchConnectionFilter extends AbstractElasticSearchFilter { + + + private final String connectionName; + private final Optional<String> connectedEntityType; + + + /** + * Create a new instance of our command + */ + @Inject + public SearchConnectionFilter( final EntityIndexFactory entityIndexFactory, final MetricsFactory metricsFactory, + @Assisted( "query" ) final String query, + @Assisted( "connectionName" ) final String connectionName, + @Assisted( "connectedEntityType" ) final Optional<String> connectedEntityType ) { + super( entityIndexFactory, metricsFactory, query ); + + this.connectionName = connectionName; + this.connectedEntityType = connectedEntityType; + } + + + @Override + protected SearchTypes getSearchTypes() { + final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() ); + + return searchTypes; + } + + + @Override + protected SearchEdge getSearchEdge( final Id id ) { + final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName ); + + return searchEdge; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java new file mode 100644 index 0000000..5b3a42e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdge; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + + +/** + * Filter should take and Id and a graph edge, and ensure the connection between the two exists + */ +public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>> implements + PipelineOperation<FilterResult<Id>, FilterResult<Id>> { + + private final GraphManagerFactory graphManagerFactory; + private final Id targetId; + + + @Inject + public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id + targetId ) { + this.graphManagerFactory = graphManagerFactory; + this.targetId = targetId; + } + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { + + final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); + + return filterValueObservable.flatMap( filterValue -> { + final String edgeTypeName = getEdgeName(); + final Id id = filterValue.getValue(); + + //create our search + final SearchByEdge searchByEdge = + new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.absent() ); + + //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node + return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath())); + } ); + } + + + /** + * Get the name of the edge to be used in the seek + */ + protected abstract String getEdgeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java new file mode 100644 index 0000000..f477092 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * Command for reading graph edges + */ +public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> { + + private final GraphManagerFactory graphManagerFactory; + + + /** + * Create a new instance of our command + */ + public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) { + this.graphManagerFactory = graphManagerFactory; + } + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) { + + + //get the graph manager + final GraphManager graphManager = + graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); + + + final String edgeName = getEdgeTypeName(); + final EdgeState edgeCursorState = new EdgeState(); + + + //return all ids that are emitted from this edge + return previousIds.flatMap( previousFilterValue -> { + + //set our our constant state + final Optional<Edge> startFromCursor = getSeekValue(); + final Id id = previousFilterValue.getValue(); + + + final SimpleSearchByEdgeType search = + new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + startFromCursor ); + + /** + * TODO, pass a message with pointers to our cursor values to be generated later + */ + return graphManager.loadEdgesFromSource( search ) + //set the edge state for cursors + .doOnNext( edge -> edgeCursorState.update( edge ) ) + + //map our id from the target edge and set our cursor every edge we traverse + .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(), + previousFilterValue.getPath() ) ); + } ); + } + + + @Override + protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue, + final Optional<EdgePath> parent ) { + + //if it's our first pass, there's no cursor to generate + if(cursorValue == null){ + return new FilterResult<>( emit, parent ); + } + + return super.createFilterResult( emit, cursorValue, parent ); + } + + + @Override + protected CursorSerializer<Edge> getCursorSerializer() { + return EdgeCursorSerializer.INSTANCE; + } + + + /** + * Get the edge type name we should use when traversing + */ + protected abstract String getEdgeTypeName(); + + + /** + * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We + * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we + * seek to a value = to our last, we may skip data. + */ + private final class EdgeState { + + private Edge cursorEdge = null; + private Edge currentEdge = null; + + + /** + * Update the pointers + */ + private void update( final Edge newEdge ) { + cursorEdge = currentEdge; + currentEdge = newEdge; + } + + + /** + * Get the edge to use in cursors for resume + */ + private Edge getCursorEdge() { + return cursorEdge; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java new file mode 100644 index 0000000..8d9bf6f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; + + +/** + * Edge cursor serializer + */ +public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> { + + + public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer(); + + @Override + protected Class<SimpleEdge> getType() { + return SimpleEdge.class; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java new file mode 100644 index 0000000..003038a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + + +/** + * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should + * be removed + */ +public class EntityIdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{ + + private final Id entityId; + + + @Inject + public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;} + + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) { + //ignore what our input was, and simply emit the id specified + return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() )); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java new file mode 100644 index 0000000..41507e9 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from a set of Ids. and verify they are valid + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>>{ + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) { + + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() ); + + //it's more efficient to make 1 network hop to get everything, then drop our results if required + final Observable<FilterResult<Entity>> entityObservable = + filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> { + + final Observable<EntitySet> entitySetObservable = + Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList() + .flatMap( ids -> entityCollectionManager.load( ids ) ); + + + //now we have a collection, validate our canidate set is correct. + + return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) ) + .doOnNext( entityCollector -> entityCollector.merge() ).flatMap( + entityCollector -> Observable.from( entityCollector.getResults() ) ); + } ); + + return entityObservable; + } + + + /** + * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this + * state is mutable and difficult to represent functionally + */ + private static final class EntityVerifier { + + private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); + private List<FilterResult<Entity>> results = new ArrayList<>(); + + private final List<FilterResult<Id>> candidateResults; + private final EntitySet entitySet; + + + public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) { + this.entitySet = entitySet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( entitySet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final FilterResult<Id> candidateResult : candidateResults ) { + validate( candidateResult ); + } + } + + + public List<FilterResult<Entity>> getResults() { + return results; + } + + + private void validate( final FilterResult<Id> filterResult ) { + + final Id candidateId = filterResult.getValue(); + + + final MvccEntity entity = entitySet.getEntity( candidateId ); + + + //doesn't exist warn and drop + if ( entity == null || !entity.getEntity().isPresent() ) { + logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra." + + " Ignoring since this could be a region sync issue", candidateId ); + + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + } + + //it exists, add it + + final Entity returnEntity = entity.getEntity().get(); + + final Optional<EdgePath> parent = filterResult.getPath(); + + final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent ); + + results.add( toReturn ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg new file mode 100644 index 0000000..c0308bd Binary files /dev/null and b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg differ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java new file mode 100644 index 0000000..71d606c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + + +/** + * Read an edge in the graph to verify it's existence by id + */ +public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{ + + private final String collectionName; + + @Inject + public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) { + super( graphManagerFactory, targetId ); + this.collectionName = collectionName; + } + + + @Override + protected String getEdgeName() { + return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java new file mode 100644 index 0000000..dc39f5c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName; + + +/** + * Command for reading graph edges on a collection + */ +public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { + + private final String collectionName; + + + /** + * Create a new instance of our command + */ + @Inject + public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) { + super( graphManagerFactory ); + this.collectionName = collectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getEdgeTypeFromCollectionName( collectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java new file mode 100644 index 0000000..4f71202 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + + +/** + * Read an edge in the graph to verify it's existence by id + */ +public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{ + + private final String connectionName; + + @Inject + public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory, + @Assisted final String connectionName, @Assisted final Id targetId ) { + super( graphManagerFactory, targetId ); + this.connectionName = connectionName; + } + + + @Override + protected String getEdgeName() { + return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java new file mode 100644 index 0000000..61ba4ad --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; + + +/** + * Command for reading graph edges on a connection + */ +public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{ + + private final GraphManagerFactory graphManagerFactory; + private final String connectionName; + private final String entityType; + + + /** + * Create a new instance of our command + */ + @Inject + public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory, + @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) { + this.graphManagerFactory = graphManagerFactory; + this.connectionName = connectionName; + this.entityType = entityType; + } + + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) { + + //get the graph manager + final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); + + + + final String edgeName = getEdgeTypeFromConnectionType( connectionName ); + + + //return all ids that are emitted from this edge + return filterResultObservable.flatMap( idFilterResult -> { + + //set our our constant state + final Optional<Edge> startFromCursor = getSeekValue(); + final Id id = idFilterResult.getValue(); + + final SimpleSearchByIdType search = + new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + entityType, startFromCursor ); + + return graphManager.loadEdgesFromSourceByType( search ).map( + edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() )); + } ); + } + + + @Override + protected CursorSerializer<Edge> getCursorSerializer() { + return EdgeCursorSerializer.INSTANCE; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java new file mode 100644 index 0000000..11ec5f8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; + + +/** + * Command for reading graph edges on a connection + */ +public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { + + private final String connectionName; + + + /** + * Create a new instance of our command + */ + @Inject + public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) { + super( graphManagerFactory ); + this.connectionName = connectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getEdgeTypeFromConnectionType( connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java index 0260d1d..c779bb7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -49,7 +49,7 @@ public class ObservableQueryExecutor implements QueryExecutor { public Iterator<Results> iterator; - public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) { + public ObservableQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable) { //map to our old results objects, return a default empty if required this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java index fd65ebf..7128dcf 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java @@ -25,8 +25,8 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; import org.junit.Test; import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
