Repository: usergrid Updated Branches: refs/heads/master 32204b9fa -> 77b6573e4
Fixing https://issues.apache.org/jira/browse/USERGRID-1310. Also fixed an NPE found during the fix. ( in abstract connection service when entity us null it throws NPE. Changed it to throw 404) Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b1157a89 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b1157a89 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b1157a89 Branch: refs/heads/master Commit: b1157a8924686557e5c26966c5a0b14fb87eb2d6 Parents: 10e8957 Author: Ayesha Dastagiri <[email protected]> Authored: Thu Aug 11 11:37:23 2016 -0700 Committer: Ayesha Dastagiri <[email protected]> Committed: Thu Aug 11 11:37:23 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 55 ++-- .../pipeline/builder/IdBuilder.java | 29 +- .../pipeline/read/FilterFactory.java | 8 + .../AbstractReadReverseGraphFilter.java | 291 +++++++++++++++++++ .../ReadGraphReverseConnectionFilter.java | 53 ++++ .../service/ConnectionSearch.java | 8 +- .../service/ConnectionServiceImpl.java | 9 +- .../org/apache/usergrid/persistence/Query.java | 39 ++- .../persistence/EntityConnectionsIT.java | 67 ++++- .../services/AbstractConnectionsService.java | 35 +-- .../usergrid/services/ConnectionsServiceIT.java | 74 ++++- 11 files changed, 559 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index b398562..57b1526 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -17,69 +17,48 @@ package org.apache.usergrid.corepersistence; -import java.util.*; - +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; -import org.apache.usergrid.corepersistence.results.IdQueryExecutor; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapScope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.Assert; - -import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; +import org.apache.usergrid.corepersistence.results.IdQueryExecutor; import org.apache.usergrid.corepersistence.service.CollectionSearch; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionSearch; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.ConnectedEntityRef; -import org.apache.usergrid.persistence.ConnectionRef; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.Query.Level; -import org.apache.usergrid.persistence.RelationManager; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.RoleRef; -import org.apache.usergrid.persistence.Schema; -import org.apache.usergrid.persistence.SimpleEntityRef; -import org.apache.usergrid.persistence.SimpleRoleRef; import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Group; import org.apache.usergrid.persistence.entities.User; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.graph.GraphManager; -import org.apache.usergrid.persistence.graph.MarkedEdge; -import org.apache.usergrid.persistence.graph.SearchByEdge; -import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.*; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.index.query.Identifier; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.schema.CollectionInfo; import org.apache.usergrid.utils.InflectionUtils; import org.apache.usergrid.utils.MapUtils; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.Assert; import rx.Observable; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; +import java.util.*; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*; import static org.apache.usergrid.persistence.Schema.*; import static org.apache.usergrid.utils.ClassUtils.cast; import static org.apache.usergrid.utils.InflectionUtils.singularize; @@ -954,7 +933,7 @@ public class CpRelationManager implements RelationManager { final Id sourceId = headEntity.asId(); final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl(); - + final boolean isConnecting = query.isConnecting(); if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) { @@ -968,7 +947,7 @@ public class CpRelationManager implements RelationManager { final ConnectionSearch search = new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(), - queryString, cursor ); + queryString, cursor, isConnecting ); return connectionService.searchConnectionAsRefs( search ); } }.next(); @@ -983,7 +962,7 @@ public class CpRelationManager implements RelationManager { //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders final ConnectionSearch search = new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(), - queryString, cursor ); + queryString, cursor, isConnecting ); return connectionService.searchConnection( search ); } }.next(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index 781d7d5..b7d1f86 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -20,9 +20,10 @@ package org.apache.usergrid.corepersistence.pipeline.builder; +import com.google.common.base.Optional; +import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; -import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter; @@ -30,13 +31,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - import rx.Observable; @@ -69,6 +66,28 @@ public class IdBuilder { /** + * Traverse all connection edges to our input Id + * @param connectionName The name of the connection + * @param entityType The optional type of the entity + * @return + */ + public IdBuilder traverseReverseConnection( final String connectionName, final Optional<String> entityType ) { + + final PipelineOperation<FilterResult<Id>, FilterResult<Id>> filter; + + if(entityType.isPresent()){ + //todo: change this too. + filter = filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType.get() ); + }else{ + filter = filterFactory.readGraphReverseConnectionFilter( connectionName ); + } + + + return new IdBuilder( pipeline.withFilter(filter ), filterFactory ); + } + + + /** * Traverse all the collection edges from our input Id * @param collectionName * @return http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index 883fdc8..4b615d8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -62,6 +62,14 @@ public interface FilterFactory { */ ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName ); + + /** + * Generate a new instance of the command with the specified parameters + * + * @param connectionName The connection name to use when reverse traversing the graph + */ + ReadGraphReverseConnectionFilter readGraphReverseConnectionFilter( final String connectionName ); + /** * Generate a new instance of the command with the specified parameters * http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java new file mode 100644 index 0000000..dcda98f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; +import rx.functions.Func1; + + +/** + * Command for reading graph edges in reverse order. + */ +public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> { + + private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class ); + + private final GraphManagerFactory graphManagerFactory; + private final RxTaskScheduler rxTaskScheduler; + private final EventBuilder eventBuilder; + private final AsyncEventService asyncEventService; + + + /** + * Create a new instance of our command + */ + public AbstractReadReverseGraphFilter( final GraphManagerFactory graphManagerFactory, + final RxTaskScheduler rxTaskScheduler, + final EventBuilder eventBuilder, + final AsyncEventService asyncEventService ) { + this.graphManagerFactory = graphManagerFactory; + this.rxTaskScheduler = rxTaskScheduler; + this.eventBuilder = eventBuilder; + this.asyncEventService = asyncEventService; + } + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) { + + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + //get the graph manager + final GraphManager graphManager = + graphManagerFactory.createEdgeManager( applicationScope ); + + + final String edgeName = getEdgeTypeName(); + final EdgeState edgeCursorState = new EdgeState(); + + + //return all ids that are emitted from this edge + return previousIds.flatMap( previousFilterValue -> { + + //set our our constant state + final Optional<MarkedEdge> startFromCursor = getSeekValue(); + final Id id = previousFilterValue.getValue(); + + + final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull()); + + /** + * We do not want to filter. This is intentional DO NOT REMOVE!!! + * + * We want to fire events on these edges if they exist, the delete was missed. + */ + final SimpleSearchByEdgeType search = + new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + typeWrapper, false ); + + /** + * TODO, pass a message with pointers to our cursor values to be generated later + */ + return graphManager.loadEdgesToTarget( search ).filter(markedEdge -> { + + final boolean isDeleted = markedEdge.isDeleted(); + final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete(); + final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted(); + + + if (isDeleted) { + + logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge); + final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge); + + indexMessageObservable + .compose(applyCollector()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + .subscribe(); + + } + + if (isSourceNodeDeleted) { + + final Id sourceNodeId = markedEdge.getSourceNode(); + logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId); + + final EventBuilderImpl.EntityDeleteResults + entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId); + + entityDeleteResults.getIndexObservable() + .compose(applyCollector()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + .subscribe(); + + Observable.merge(entityDeleteResults.getEntitiesDeleted(), + entityDeleteResults.getCompactedNode()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()). + subscribe(); + + } + + if (isTargetNodeDelete) { + + final Id targetNodeId = markedEdge.getTargetNode(); + logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId); + + final EventBuilderImpl.EntityDeleteResults + entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId); + + entityDeleteResults.getIndexObservable() + .compose(applyCollector()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + .subscribe(); + + Observable.merge(entityDeleteResults.getEntitiesDeleted(), + entityDeleteResults.getCompactedNode()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()). + subscribe(); + + } + + + //filter if any of them are marked + return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete; + + + }) // any non-deleted edges should be de-duped here so the results are unique + .distinct( new EdgeDistinctKey() ) + //set the edge state for cursors + .doOnNext( edge -> { + if (logger.isTraceEnabled()) { + logger.trace("Seeking over edge {}", edge); + } + edgeCursorState.update( edge ); + } ) + + //map our id from the target edge and set our cursor every edge we traverse + .map( edge -> createFilterResult( edge.getSourceNode(), edgeCursorState.getCursorEdge(), + previousFilterValue.getPath() ) ); + } ); + } + + + @Override + protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue, + final Optional<EdgePath> parent ) { + + //if it's our first pass, there's no cursor to generate + if(cursorValue == null){ + return new FilterResult<>( emit, parent ); + } + + return super.createFilterResult( emit, cursorValue, parent ); + } + + + @Override + protected CursorSerializer<MarkedEdge> getCursorSerializer() { + return EdgeCursorSerializer.INSTANCE; + } + + + /** + * Get the edge type name we should use when traversing + */ + protected abstract String getEdgeTypeName(); + + + /** + * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We + * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we + * seek to a value = to our last, we may skip data. + */ + private final class EdgeState { + + private MarkedEdge cursorEdge = null; + private MarkedEdge currentEdge = null; + + + /** + * Update the pointers + */ + private void update( final MarkedEdge newEdge ) { + cursorEdge = currentEdge; + currentEdge = newEdge; + } + + + /** + * Get the edge to use in cursors for resume + */ + private MarkedEdge getCursorEdge() { + return cursorEdge; + } + } + + private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() { + + return observable -> observable + .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) + .filter(msg -> !msg.isEmpty()) + .doOnNext(indexOperation -> { + asyncEventService.queueIndexOperationMessage(indexOperation); + }); + + } + + /** + * Return a key that Rx can use for determining a distinct edge. Build a string containing the UUID + * of the source and target nodes, with the type to ensure uniqueness rather than the int sum of the hash codes. + * Edge timestamp is specifically left out as edges with the same source,target,type but different timestamps + * are considered duplicates. + */ + private class EdgeDistinctKey implements Func1<Edge,String> { + + @Override + public String call(Edge edge) { + + return buildDistinctKey(edge.getSourceNode().getUuid().toString(), edge.getTargetNode().getUuid().toString(), + edge.getType().toLowerCase()); + } + } + + protected static String buildDistinctKey(final String sourceNode, final String targetNode, final String type){ + + final String DISTINCT_KEY_SEPARATOR = ":"; + StringBuilder stringBuilder = new StringBuilder(); + + stringBuilder + .append(sourceNode) + .append(DISTINCT_KEY_SEPARATOR) + .append(targetNode) + .append(DISTINCT_KEY_SEPARATOR) + .append(type); + + return stringBuilder.toString(); + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java new file mode 100644 index 0000000..aa369c2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; + +/** + * Created by ayeshadastagiri on 8/9/16. + */ +public class ReadGraphReverseConnectionFilter extends AbstractReadReverseGraphFilter{ + private final String connectionName; + + /** + * Create a new instance of our command + */ + @Inject + public ReadGraphReverseConnectionFilter( final GraphManagerFactory graphManagerFactory, + @AsyncRepair final RxTaskScheduler rxTaskScheduler, + final EventBuilder eventBuilder, + final AsyncEventService asyncEventService, + @Assisted final String connectionName ) { + super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService ); + this.connectionName = connectionName; + } + @Override + protected String getEdgeTypeName() { + return getEdgeTypeFromConnectionType( connectionName ); } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java index 51f6768..8ad57fb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java @@ -36,11 +36,12 @@ public class ConnectionSearch { private final int limit; private final Optional<String> query; private final Optional<String> cursor; + private final boolean isConnecting; public ConnectionSearch( final ApplicationScope applicationScope, final Id sourceNodeId, final Optional<String> entityType, final String connectionName, final int limit, final Optional<String> query, final - Optional<String> cursor ) { + Optional<String> cursor, boolean isConnecting ) { this.applicationScope = applicationScope; this.sourceNodeId = sourceNodeId; this.entityType = entityType; @@ -48,6 +49,7 @@ public class ConnectionSearch { this.limit = limit; this.query = query; this.cursor = cursor; + this.isConnecting = isConnecting; } @@ -84,4 +86,8 @@ public class ConnectionSearch { public Optional<String> getEntityType() { return entityType; } + + public boolean getIsConnecting(){ + return isConnecting; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java index 4b7e66c..926c676 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java @@ -94,8 +94,13 @@ public class ConnectionServiceImpl implements ConnectionService { if ( !query.isPresent() ) { - results = - pipelineBuilder.traverseConnection( search.getConnectionName(), search.getEntityType() ).loadEntities(); + if(search.getIsConnecting()){ + results = pipelineBuilder.traverseReverseConnection(search.getConnectionName(), search.getEntityType()).loadEntities(); + } + else { + results = + pipelineBuilder.traverseConnection(search.getConnectionName(), search.getEntityType()).loadEntities(); + } } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java index 150a1b0..d68c085 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java @@ -19,36 +19,25 @@ package org.apache.usergrid.persistence; -import java.io.IOException; -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.codec.binary.Base64; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import org.apache.commons.lang.StringUtils; - import org.apache.usergrid.persistence.index.SelectFieldMapping; import org.apache.usergrid.persistence.index.exceptions.QueryParseException; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.index.query.tree.Operand; import org.apache.usergrid.persistence.index.utils.ClassUtils; -import org.apache.usergrid.persistence.index.utils.ConversionUtils; import org.apache.usergrid.persistence.index.utils.ListUtils; import org.apache.usergrid.persistence.index.utils.MapUtils; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; +import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.*; +import java.util.Map.Entry; public class Query { @@ -82,6 +71,7 @@ public class Query { private Long startTime; private Long finishTime; private boolean pad; + private boolean connecting = false; private CounterResolution resolution = CounterResolution.ALL; private List<Identifier> identifiers; private List<CounterFilterPredicate> counterFilters; @@ -611,6 +601,15 @@ public class Query { this.pad = pad; } + //set the flag to retrieve the edges in the reverse direction. + public void setConnecting( boolean connecting ) { + this.connecting = connecting; + } + + public boolean isConnecting() { + return connecting; + } + public void setResolution( CounterResolution resolution ) { this.resolution = resolution; http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java index be2f06e..3d4e53c 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java @@ -17,24 +17,17 @@ package org.apache.usergrid.persistence; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - +import org.apache.usergrid.AbstractCoreIT; +import org.apache.usergrid.persistence.Query.Level; +import org.apache.usergrid.persistence.entities.User; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.AbstractCoreIT; -import org.apache.usergrid.persistence.entities.User; -import org.apache.usergrid.persistence.Query.Level; +import java.util.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class EntityConnectionsIT extends AbstractCoreIT { private static final Logger logger = LoggerFactory.getLogger( EntityConnectionsIT.class ); @@ -335,6 +328,54 @@ public class EntityConnectionsIT extends AbstractCoreIT { assertEquals( "user", res.getEntity().getType() ); } + //not required . addd tests at service layer. + @Ignore + @Test + public void testGetConnectingEntitiesCursor() throws Exception { + + UUID applicationId = app.getId( ); + assertNotNull( applicationId ); + + EntityManager em = app.getEntityManager(); + assertNotNull( em ); + + User fred = new User(); + fred.setUsername( "fred" ); + fred.setEmail( "[email protected]" ); + Entity fredEntity = em.create( fred ); + assertNotNull( fredEntity ); + + User wilma = new User(); + wilma.setUsername( "wilma" ); + wilma.setEmail( "[email protected]" ); + Entity wilmaEntity = em.create( wilma ); + assertNotNull( wilmaEntity ); + + User John = new User(); + John.setUsername( "John" ); + John.setEmail( "[email protected]" ); + Entity JohnEntity = em.create( John ); + assertNotNull( JohnEntity ); + + em.createConnection( fredEntity, "likes", wilmaEntity ); + em.createConnection( fredEntity, "likes", JohnEntity ); + + + app.refreshIndex(); + + // now query via the testConnection, this should work + + Query query = Query.fromQLNullSafe("" ); + query.setConnectionType( "likes" ); +// query.setConnecting(true); + query.setEntityType( "user" ); + + // goes through "traverseReverseConnection" + Results r = em.searchTargetEntities(fredEntity, query); + + assertEquals( 2, r.size() ); + } + http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java index 83549dd..0a9f6a7 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java @@ -17,21 +17,8 @@ package org.apache.usergrid.services; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.ConnectionRef; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.Query.Level; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.Schema; -import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.services.ServiceParameter.IdParameter; import org.apache.usergrid.services.ServiceParameter.NameParameter; @@ -39,10 +26,15 @@ import org.apache.usergrid.services.ServiceParameter.QueryParameter; import org.apache.usergrid.services.ServiceResults.Type; import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException; import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import rx.schedulers.Schedulers; +import java.util.List; +import java.util.Set; +import java.util.UUID; + import static org.apache.usergrid.services.ServiceParameter.filter; import static org.apache.usergrid.services.ServiceParameter.firstParameterIsName; import static org.apache.usergrid.utils.ClassUtils.cast; @@ -307,6 +299,7 @@ public class AbstractConnectionsService extends AbstractService { Results r = null; if ( connecting() ) { + query.setConnecting(true); if ( query.hasQueryPredicates() ) { if (logger.isTraceEnabled()) { logger.trace("Attempted query of backwards connections"); @@ -314,13 +307,7 @@ public class AbstractConnectionsService extends AbstractService { return null; } else { -// r = em.getSourceEntities( context.getOwner().getUuid(), query.getConnectionType(), -// query.getEntityType(), level ); - // usergrid-2389: User defined limit in the query is ignored. Fixed it by adding - // the limit to the method parameter downstream. - r = em.getSourceEntities( - new SimpleEntityRef(context.getOwner().getType(), context.getOwner().getUuid()), - query.getConnectionType(), query.getEntityType(), level, query.getLimit()); + r = em.searchTargetEntities(context.getOwner(),query); } } else { @@ -381,6 +368,10 @@ public class AbstractConnectionsService extends AbstractService { } else { entity = em.create( query.getEntityType(), context.getProperties() ); + //if entity is null here it throws NPE. Fixing it to throw 404. + if ( entity == null ) { + throw new ServiceResourceNotFoundException( context ); + } } entity = importEntity( context, entity ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java index a1f19d4..4e65f54 100644 --- a/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java +++ b/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java @@ -17,19 +17,17 @@ package org.apache.usergrid.services; -import java.util.Map; - +import org.apache.usergrid.persistence.Entity; +import org.apache.usergrid.persistence.Query; import org.junit.Assert; import org.junit.Test; - -import org.apache.usergrid.persistence.Entity; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + +import static org.junit.Assert.*; + public class ConnectionsServiceIT extends AbstractServiceIT { @@ -86,6 +84,66 @@ public class ConnectionsServiceIT extends AbstractServiceIT { app.testRequest( ServiceAction.POST, 1, "users", "conn-user1", "manages", "user" ); } + @SuppressWarnings("rawtypes") + @Test + public void testUserConnectionsCursor() throws Exception { + app.put("username", "conn-user1"); + app.put("email", "[email protected]"); + + Entity user1 = app.testRequest(ServiceAction.POST, 1, "users").getEntity(); + assertNotNull(user1); + + app.testRequest(ServiceAction.GET, 1, "users", "conn-user1"); + + app.put("username", "conn-user2"); + app.put("email", "[email protected]"); + + Entity user2 = app.testRequest(ServiceAction.POST, 1, "users").getEntity(); + assertNotNull(user2); + + + app.put("username", "conn-user3"); + app.put("email", "[email protected]"); + + Entity user3 = app.testRequest(ServiceAction.POST, 1, "users").getEntity(); + assertNotNull(user3); + + + //POST users/conn-user2/manages/user2/conn-user1 + app.testRequest(ServiceAction.POST, 1, "users", "conn-user2", "likes", "users", "conn-user1"); + //POST users/conn-user3/reports/users/conn-user1 + app.testRequest(ServiceAction.POST, 1, "users", "conn-user3", "likes", "users", "conn-user1"); + + Query query = new Query().fromQLNullSafe(""); + query.setLimit(1); + + //the result should return a valid cursor. + ServiceResults result = app.testRequest(ServiceAction.GET, 1, "users", "conn-user1", "connecting", "likes",query); + assertNotNull(result.getCursor()); + String enityName1 = result.getEntity().getProperty("email").toString(); + + Query newquery = new Query().fromQLNullSafe(""); + query.setCursor(result.getCursor()); + result = app.testRequest(ServiceAction.GET,1,"users","conn-user1","connecting","likes",query); + String enityName2 = result.getEntity().getProperty("email").toString(); + + //ensure the two entities returned in above requests are different. + assertNotEquals(enityName1,enityName2); + + newquery = new Query().fromQLNullSafe(""); + query.setCursor(result.getCursor()); + result = app.testRequest(ServiceAction.GET,0,"users","conn-user1","connecting","likes",query); + //return empty cursor when no more entitites found. + assertNull(result.getCursor()); + + //DELETE users/conn-user1/manages/user2/conn-user2 (qualified by collection type on second entity) + app.testRequest(ServiceAction.DELETE, 1, "users", "conn-user2", "likes", "users", "conn-user1"); + + app.testRequest(ServiceAction.GET,1,"users","conn-user1","connecting","likes"); + + + } + @Test public void testNonExistentEntity() throws Exception {
