http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java new file mode 100644 index 0000000..ccc5198 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.usergrid.persistence.index.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> { + + private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class ); + + private final ResultsLoaderFactory resultsLoaderFactory; + + private final ApplicationScope applicationScope; + + private final ApplicationEntityIndex entityIndex; + + private final SearchEdge indexScope; + + private final SearchTypes types; + + private final Query query; + + + private Results currentResults; + + private boolean moreToLoad = true; + + + public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, final SearchEdge indexScope, + final SearchTypes types, final Query query ) { + this.resultsLoaderFactory = resultsLoaderFactory; + this.applicationScope = applicationScope; + this.entityIndex = entityIndex; + this.indexScope = indexScope; + this.types = types; + + //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant + //once we start subscribing to streams. + this.query = new Query(query); + } + + + @Override + public Iterator<Results> iterator() { + return this; + } + + + private void loadNextPage() { + // Because of possible stale entities, which are filtered out by buildResults(), + // we loop until the we've got enough results to satisfy the query limit. + + final int maxQueries = 10; // max re-queries to satisfy query limit + + final int originalLimit = query.getLimit(); + + Results results = null; + int queryCount = 0; + + + CandidateResults crs = null; + + while ( queryCount++ < maxQueries ) { + + crs = getCandidateResults( query ); + + + logger.debug( "Calling build results with crs {}", crs ); + results = buildResults( indexScope, query, crs ); + + /** + * In an edge case where we delete stale entities, we could potentially get less results than expected. + * This will only occur once during the repair phase. + * We need to ensure that we short circuit before we overflow the requested limit during a repair. + */ + if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more + break; + } + + + //we didn't load anything, but there was a cursor, this means a read repair occured. We have to short + //circuit to avoid over returning the result set + + + // need to query for more + // ask for just what we need to satisfy, don't want to exceed limit + query.setOffsetFromCursor(results.getCursor()); + query.setLimit( originalLimit - results.size() ); + + logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] { + originalLimit, query.getLimit(), queryCount + } ); + } + + //now set our cursor if we have one for the next iteration + if ( results.hasCursor() ) { + query.setOffsetFromCursor(results.getCursor()); + moreToLoad = true; + } + + else { + moreToLoad = false; + } + + + //set our select subjects into our query if provided + if(crs != null){ + query.setSelectSubjects( crs.getGetFieldMappings() ); + } + + + //set our current results and the flag + this.currentResults = results; + } + + + /** + * Get the candidates or load the cursor, whichever we require + * @param query + * @return + */ + private CandidateResults getCandidateResults(final Query query){ + final Optional<Integer> cursor = query.getOffset(); + final String queryToExecute = query.getQl().or("select *"); + + CandidateResults results = cursor.isPresent() + ? entityIndex.search( indexScope, types, queryToExecute, query.getLimit() , cursor.get()) + : entityIndex.search( indexScope, types, queryToExecute, query.getLimit()); + + return results; + } + + + /** + * Build results from a set of candidates, and discard those that represent stale indexes. + * + * @param query Query that was executed + * @param crs Candidates to be considered for results + */ + private Results buildResults( final SearchEdge indexScope, final Query query, final CandidateResults crs ) { + + logger.debug( "buildResults() from {} candidates", crs.size() ); + + //get an instance of our results loader + final ResultsLoader resultsLoader = + this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() ); + + //load the results + final Results results = resultsLoader.loadResults(crs); + + //signal for post processing + resultsLoader.postProcess(); + + //set offset into query + if(crs.getOffset().isPresent()) { + query.setOffset(crs.getOffset().get()); + }else{ + query.clearOffset(); + } + results.setCursorFromOffset( query.getOffset() ); + + logger.debug( "Returning results size {}", results.size() ); + + return results; + } + + + @Override + public boolean hasNext() { + + //we've tried to load and it's empty and we have more to load, load the next page + if ( currentResults == null ) { + //there's nothing left to load, nothing to do + if ( !moreToLoad ) { + return false; + } + + //load the page + + loadNextPage(); + } + + + //see if our current results are not null + return currentResults != null; + } + + + @Override + public Results next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more results present" ); + } + + final Results toReturn = currentResults; + + currentResults = null; + + return toReturn; + } + + @Override + public void remove() { + throw new RuntimeException("Remove not implemented!!"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java new file mode 100644 index 0000000..d73c731 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java @@ -0,0 +1,127 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.Entity; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Optional; + + +/** + * A loader that verifies versions are correct in cassandra and match elasticsearch + */ +public class EntityVerifier implements ResultsVerifier { + + private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); + + private EntitySet ids; + + private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping; + + + public EntityVerifier( final int maxSize ) { + this.entityMapping = new HashMap<>( maxSize ); + } + + + @Override + public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) { + ids = ecm.load( idsToLoad ).toBlocking().last(); + logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size()); + } + + + @Override + public boolean isValid( final CandidateResult candidateResult ) { + final Id entityId = candidateResult.getId(); + + final MvccEntity savedEntity = ids.getEntity( entityId ); + + //version wasn't found deindex + if ( savedEntity == null ) { + logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() ); + return false; + } + + final UUID candidateVersion = candidateResult.getVersion(); + final UUID savedVersion = savedEntity.getVersion(); + + if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) { + logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { + entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity + } ); + + return false; + } + + + final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity(); + + if ( !entity.isPresent() ) { + logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ", + entityId.getUuid(), savedEntity.getEntity() ); + return false; + } + + entityMapping.put( entityId, entity.get() ); + + return true; + } + + + @Override + public Results getResults( final Collection<Id> ids ) { + + final List<Entity> ugEntities = new ArrayList<>( ids.size() ); + + for ( final Id id : ids ) { + final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id ); + + Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + ugEntities.add( entity ); + } + + return Results.fromEntities( ugEntities ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java new file mode 100644 index 0000000..ade64a2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; + + +public class FilteringLoader implements ResultsLoader { + + private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class ); + + private final EntityCollectionManager entityCollectionManager; + private final ResultsVerifier resultsVerifier; + private final ApplicationScope applicationScope; + private final SearchEdge indexScope; + private final EntityIndexBatch indexBatch; + + + /** + * Create an instance of a filter loader + * + * @param entityCollectionManager The entityCollectionManagerFactory + * @param resultsVerifier The verifier to verify the candidate results + * @param applicationScope The application scope to perform the load + * @param indexScope The index scope used in the search + */ + protected FilteringLoader( final EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex, final ResultsVerifier resultsVerifier, + final ApplicationScope applicationScope, final SearchEdge indexScope ) { + + this.entityCollectionManager = entityCollectionManager; + this.resultsVerifier = resultsVerifier; + this.applicationScope = applicationScope; + this.indexScope = indexScope; + + indexBatch = applicationEntityIndex.createBatch(); + } + + + @Override + public Results loadResults( final CandidateResults crs ) { + + + if ( crs.size() == 0 ) { + return new Results(); + } + + + // For each entity, holds the index it appears in our candidates for keeping ordering correct + final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() ); + + // Maps the entity ids to our candidates + final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() ); + + + final Iterator<CandidateResult> iter = crs.iterator(); + + + // TODO, in this case we're "optimizing" due to the limitations of collection scope. + // Perhaps we should change the API to just be an application, then an "owner" scope? + + // Go through the candidates and group them by scope for more efficient retrieval. + // Also remove duplicates before we even make a network call + for ( int i = 0; iter.hasNext(); i++ ) { + + final CandidateResult currentCandidate = iter.next(); + + final Id entityId = currentCandidate.getId(); + + //check if we've seen this candidate by id + final CandidateResult previousMax = maxCandidateMapping.get( entityId ); + + //its not been seen, save it + if ( previousMax == null ) { + maxCandidateMapping.put( entityId, currentCandidate ); + orderIndex.put( entityId, i ); + continue; + } + + //we have seen it, compare them + + final UUID previousMaxVersion = previousMax.getVersion(); + + final UUID currentVersion = currentCandidate.getVersion(); + + + final CandidateResult toRemove; + final CandidateResult toKeep; + + //current is newer than previous. Remove previous and keep current + if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) { + toRemove = previousMax; + toKeep = currentCandidate; + } + //previously seen value is newer than current. Remove the current and keep the previously seen value + else { + toRemove = currentCandidate; + toKeep = previousMax; + } + + //this is a newer version, we know we already have a stale entity, add it to be cleaned up + + + //de-index it + logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { + entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion() + } ); + + //deindex this document, and remove the previous maxVersion + //we have to deindex this from our ownerId, since this is what gave us the reference + indexBatch.deindex( indexScope, toRemove ); + + + //TODO, fire the entity repair cleanup task here instead of de-indexing + + //replace the value with a more current version + maxCandidateMapping.put( entityId, toKeep ); + orderIndex.put( entityId, i ); + } + + + //now everything is ordered, and older versions are removed. Batch fetch versions to verify + // existence and correct versions + + final TreeMap<Integer, Id> sortedResults = new TreeMap<>(); + + + final Collection<Id> idsToLoad = + Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() { + @Nullable + @Override + public Id apply( @Nullable final CandidateResult input ) { + //NOTE this is never null, we won't need to check + return input.getId(); + } + } ); + + + //now using the scope, load the collection + + + + //load the results into the loader for this scope for validation + resultsVerifier.loadResults( idsToLoad, entityCollectionManager ); + + //now let the loader validate each candidate. For instance, the "max" in this candidate + //could still be a stale result, so it needs validated + for ( final Id requestedId : idsToLoad ) { + + final CandidateResult cr = maxCandidateMapping.get( requestedId ); + + //ask the loader if this is valid, if not discard it and de-index it + if ( !resultsVerifier.isValid( cr ) ) { + indexBatch.deindex( indexScope, cr ); + continue; + } + + //if we get here we're good, we need to add this to our results + final int candidateIndex = orderIndex.get( requestedId ); + + sortedResults.put( candidateIndex, requestedId ); + } + + + // NOTE DO NOT execute the batch here. + // It changes the results and we need consistent paging until we aggregate all results + return resultsVerifier.getResults( sortedResults.values() ); + } + + + @Override + public void postProcess() { + this.indexBatch.execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java new file mode 100644 index 0000000..4a3bfcd --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.model.entity.Id; + + +public class IdsVerifier extends VersionVerifier { + + @Override + public Results getResults( final Collection<Id> ids ) { + + final List<UUID> returnIds = new ArrayList<>( ids.size() ); + + for ( final Id id : ids ) { + returnIds.add( id.getUuid() ); + } + + + return Results.fromIdList( returnIds ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java new file mode 100644 index 0000000..c2a3e9a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.index.CandidateResults; + + +/** + * Interface for loading results + */ +public interface ResultsLoader { + + /** + * Using the candidate results, load our results. Should filter stale results + * @param crs The candidate result set + * @return Results. Null safe, but may be empty + */ + public Results loadResults( final CandidateResults crs); + + /** + * Post process the load operation + */ + public void postProcess(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java new file mode 100644 index 0000000..3ccca1b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.SearchEdge; + + +/** + * Factory for creating results + */ +public interface ResultsLoaderFactory { + + /** + * Get the loader for results + * @param applicationScope The application scope used to load results + * @param indexScope The index scope used in the search + * @param + */ + ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope, + final Query.Level resultsLevel ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java new file mode 100644 index 0000000..fe72ca2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import java.util.Collection; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.model.entity.Id; + + +public interface ResultsVerifier { + + /** + * Load all the candidate ides for verification + * @param ids The Id's to load + * @param ecm The entity collection manager + */ + public void loadResults(Collection<Id> ids, EntityCollectionManager ecm); + + /** + * Return true if the candidate result is a valid result that should be retained. If it should + * not it should also be removed from the list of possible return values in this loader + * @param candidateResult + */ + public boolean isValid(CandidateResult candidateResult); + + + /** + * Load the result set with the given ids + * @return + */ + public Results getResults(Collection<Id> ids); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java new file mode 100644 index 0000000..c49fb28 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl; + + +import java.util.Collection; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.VersionSet; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; + + +/** + * A loader that verifies versions are correct in Cassandra and match ElasticSearch + */ +public abstract class VersionVerifier implements ResultsVerifier { + + private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class ); + + private VersionSet ids; + + + @Override + public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) { + ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last(); + } + + + @Override + public boolean isValid( final CandidateResult candidateResult ) { + final Id entityId = candidateResult.getId(); + + final MvccLogEntry version = ids.getMaxVersion( entityId ); + + //version wasn't found ,deindex + if ( version == null ) { + logger.warn( "Version for Entity {}:{} not found", + entityId.getUuid(), entityId.getUuid() ); + + return false; + } + + final UUID savedVersion = version.getVersion(); + + if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) { + logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", + new Object[] { + entityId.getUuid(), + entityId.getType(), + candidateResult.getVersion(), + savedVersion + } ); + + return false; + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java new file mode 100644 index 0000000..5734a5b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.entity; + + +import java.io.Serializable; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + + +/** + * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should + * be removed + */ +public class EntityIdFilter extends AbstractFilter<Id, Serializable> implements TraverseFilter { + + private final Id entityId; + + + @Inject + public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;} + + + @Override + protected Class<Serializable> getCursorClass() { + //no op + return null; + } + + + @Override + public Observable<Id> call( final Observable<Id> idObservable ) { + return Observable.just( entityId ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java new file mode 100644 index 0000000..74f626d --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.entity; + + +import java.io.Serializable; +import java.util.Map; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from a set of Ids. + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadCollectorFilter extends AbstractFilter<Results, Serializable> + implements CollectorFilter<Results> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final ApplicationScope applicationScope; + private int resultSize; + + + @Inject + public EntityLoadCollectorFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final ApplicationScope applicationScope ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.applicationScope = applicationScope; + } + + + @Override + protected Class<Serializable> getCursorClass() { + return null; + } + + + @Override + public Observable<Results> call( final Observable<Id> observable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + final Observable<EntitySet> entitySetObservable = observable.buffer( resultSize ).flatMap( + bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); + + + final Observable<Results> resultsObservable = entitySetObservable + + .flatMap( entitySet -> { + + //get our entites and filter missing ones, then collect them into a results object + final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); + + //convert them to our old entity model, then filter nulls, meaning they weren't found + return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( entity -> entity == null ) + + //convert them to a list, then map them into results + .toList().map( entities -> { + final Results results = Results.fromEntities( entities ); + results.setCursor( generateCursor() ); + + return results; + } ) + //if no results are present, return an empty results + .singleOrDefault( new Results( ) ); + } ); + + + return resultsObservable; + } + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + + + private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { + if ( !mvccEntity.getEntity().isPresent() ) { + return null; + } + + + final Entity cpEntity = mvccEntity.getEntity().get(); + final Id entityId = cpEntity.getId(); + + org.apache.usergrid.persistence.Entity entity = + EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + + return entity; + } + + + @Override + public void setLimit( final int limit ) { + this.resultSize = limit; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java new file mode 100644 index 0000000..2e8f041 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdge; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + + +/** + * Filter should take and Id and a graph edge, and ensure the connection between the two exists + */ +public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements TraverseFilter { + + private final GraphManagerFactory graphManagerFactory; + private final Id targetId; + + + @Inject + public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id + targetId ) { + this.graphManagerFactory = graphManagerFactory; + this.targetId = targetId; + } + + + @Override + protected Class<Id> getCursorClass() { + //no op + return null; + } + + + @Override + public Observable<Id> call( final Observable<Id> idObservable ) { + + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + + return idObservable.flatMap( id -> { + final String edgeTypeName = getEdgeName(); + + //create our search + final SearchByEdge searchByEdge = + new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.absent() ); + + //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node + return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ); + } ); + } + + + /** + * Get the name of the edge to be used in the seek + */ + protected abstract String getEdgeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java new file mode 100644 index 0000000..4bdcdad --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; + + +/** + * Command for reading graph edges + */ +public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> + implements TraverseFilter { + + private final GraphManagerFactory graphManagerFactory; + + + /** + * Create a new instance of our command + * @param graphManagerFactory + */ + public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) { + this.graphManagerFactory = graphManagerFactory; + } + + + @Override + public Observable<Id> call( final Observable<Id> observable ) { + + //get the graph manager + final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + + //set our our constant state + final Optional<Edge> startFromCursor = getCursor(); + + final String edgeName = getEdgeTypeName(); + + + //return all ids that are emitted from this edge + return observable.flatMap( new Func1<Id, Observable<Id>>() { + + @Override + public Observable<Id> call( final Id id ) { + + final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, startFromCursor ); + + /** + * TODO, pass a message with pointers to our cursor values to be generated later + */ + return graphManager.loadEdgesFromSource( search ).doOnNext( edge -> setCursor( edge ) ).map( + edge -> edge.getTargetNode() ); + } + } ); + } + + + @Override + protected Class<Edge> getCursorClass() { + return Edge.class; + } + + + + /** + * Get the edge type name we should use when traversing + * @return + */ + protected abstract String getEdgeTypeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java new file mode 100644 index 0000000..2ccad67 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.assistedinject.Assisted; + + +/** + * Read an edge in the graph to verify it's existence by id + */ +public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{ + + private final String collectionName; + + public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) { + super( graphManagerFactory, targetId ); + this.collectionName = collectionName; + } + + + @Override + protected String getEdgeName() { + return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java new file mode 100644 index 0000000..3f0a70a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; + + +/** + * Command for reading graph edges on a collection + */ +public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { + + private final String collectionName; + + + /** + * Create a new instance of our command + */ + @Inject + public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) { + super( graphManagerFactory ); + this.collectionName = collectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getCollectionScopeNameFromCollectionName( collectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java new file mode 100644 index 0000000..f16867d --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.assistedinject.Assisted; + + +/** + * Read an edge in the graph to verify it's existence by id + */ +public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{ + + private final String connectionName; + + public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory, + @Assisted final String connectionName, @Assisted final Id targetId ) { + super( graphManagerFactory, targetId ); + this.connectionName = connectionName; + } + + + @Override + protected String getEdgeName() { + return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java new file mode 100644 index 0000000..3057111 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; + + +/** + * Command for reading graph edges on a connection + */ +public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> implements TraverseFilter { + + private final GraphManagerFactory graphManagerFactory; + private final String connectionName; + private final String entityType; + + + /** + * Create a new instance of our command + */ + public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory, + @Assisted final String connectionName, @Assisted final String entityType ) { + this.graphManagerFactory = graphManagerFactory; + this.connectionName = connectionName; + this.entityType = entityType; + } + + + @Override + public Observable<Id> call( final Observable<Id> observable ) { + + //get the graph manager + final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + + //set our our constant state + final Optional<Edge> startFromCursor = getCursor(); + + final String edgeName = getConnectionScopeName( connectionName ); + + + //return all ids that are emitted from this edge + return observable.flatMap( id -> { + + final SimpleSearchByIdType search = + new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + entityType, startFromCursor ); + + /** + * TODO, pass a message with pointers to our cursor values to be generated later + */ + return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map( + edge -> edge.getTargetNode() ); + } ); + } + + + @Override + protected Class<Edge> getCursorClass() { + return Edge.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java new file mode 100644 index 0000000..49360f6 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; + + +/** + * Command for reading graph edges on a connection + */ +public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { + + private final String connectionName; + + + /** + * Create a new instance of our command + */ + @Inject + public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) { + super( graphManagerFactory ); + this.connectionName = connectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getConnectionScopeName( connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java index ec4271a..663c15c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java @@ -24,8 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; -import org.apache.usergrid.corepersistence.command.CommandBuilder; -import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector; +import org.apache.usergrid.corepersistence.pipeline.DataPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; @@ -82,14 +82,14 @@ public abstract class AbstractGraphQueryExecutor implements QueryExecutor { //assign them to an iterator. this now uses an internal buffer with backpressure, so we won't load all // results //set up our command builder - final CommandBuilder commandBuilder = new CommandBuilder( applicationScope, sourceId, requestCursor, limit ); + final DataPipeline dataPipeline = new DataPipeline( applicationScope, sourceId, requestCursor, limit ); - addTraverseCommand( commandBuilder ); + addTraverseCommand( dataPipeline ); //construct our results to be observed later. This is a cold observable final Observable<Results> resultsObservable = - commandBuilder.build( new EntityLoadCollector( entityCollectionManagerFactory, applicationScope ) ); + dataPipeline.build( new EntityLoadCollectorFilter( entityCollectionManagerFactory, applicationScope ) ); this.observableIterator = resultsObservable.toBlocking().getIterator(); @@ -129,5 +129,5 @@ public abstract class AbstractGraphQueryExecutor implements QueryExecutor { /** * Add the traverse command to the graph */ - protected abstract void addTraverseCommand( final CommandBuilder commandBuilder ); + protected abstract void addTraverseCommand( final DataPipeline dataPipeline ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java index 515fc2b..b506c71 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java @@ -20,8 +20,8 @@ package org.apache.usergrid.corepersistence.results; -import org.apache.usergrid.corepersistence.command.CommandBuilder; -import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand; +import org.apache.usergrid.corepersistence.pipeline.DataPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -53,8 +53,8 @@ public class CollectionGraphQueryExecutor extends AbstractGraphQueryExecutor { @Override - protected void addTraverseCommand( final CommandBuilder commandBuilder ) { + protected void addTraverseCommand( final DataPipeline dataPipeline ) { //set the traverse command from the source Id to the connect name - commandBuilder.withTraverseCommand( new ReadGraphCollectionCommand( graphManagerFactory, collectionName ) ); + dataPipeline.withTraverseCommand( new ReadGraphCollectionFilter( graphManagerFactory, collectionName ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java index 61726b7..3dc5cd6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java @@ -20,8 +20,8 @@ package org.apache.usergrid.corepersistence.results; -import org.apache.usergrid.corepersistence.command.CommandBuilder; -import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand; +import org.apache.usergrid.corepersistence.pipeline.DataPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -51,8 +51,8 @@ public class ConnectionGraphQueryExecutor extends AbstractGraphQueryExecutor { @Override - protected void addTraverseCommand( final CommandBuilder commandBuilder ) { + protected void addTraverseCommand( final DataPipeline dataPipeline ) { //set the traverse command from the source Id to the connect name - commandBuilder.withTraverseCommand( new ReadGraphConnectionCommand( graphManagerFactory, connectionName ) ); + dataPipeline.withTraverseCommand( new ReadGraphConnectionFilter( graphManagerFactory, connectionName ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java index d40efc0..99c0cdb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java @@ -53,7 +53,7 @@ public class SimpleSearchByEdge implements SearchByEdge { * @param maxTimestamp The maximum timestamp to seek from * @param last The value to start seeking from. Must be >= this value */ - public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Edge last ) { + public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Optional<Edge> last ) { ValidationUtils.verifyIdentity(sourceNode); ValidationUtils.verifyIdentity(targetNode); @@ -67,7 +67,7 @@ public class SimpleSearchByEdge implements SearchByEdge { this.type = type; this.maxTimestamp = maxTimestamp; this.order = order; - this.last = Optional.fromNullable(last); + this.last = last; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java index 4b73347..9d989a3 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java @@ -25,6 +25,8 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.SearchByIdType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + /** * @@ -44,7 +46,7 @@ public class SimpleSearchByIdType extends SimpleSearchByEdgeType implements Sear * @param last The value to start seeking from. Must be >= this value */ - public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType, final Edge last ) { + public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType, final Optional<Edge> last ) { super( node, type, maxTimestamp, order, last ); ValidationUtils.verifyString( idType, "idType" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java index cc0bb51..c81fa58 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java @@ -37,6 +37,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + /** * Simple class for edge testing generation @@ -190,7 +192,8 @@ public class EdgeTestUtils { */ public static SearchByEdge createGetByEdge( final Id sourceId, final String type, final Id targetId, final long maxVersion, final Edge last ) { - return new SimpleSearchByEdge( sourceId, type, targetId, maxVersion, SearchByEdgeType.Order.DESCENDING, last ); + return new SimpleSearchByEdge( sourceId, type, targetId, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional + .of( last ) ); } //