WIP squash
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b144cc2b Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b144cc2b Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b144cc2b Branch: refs/heads/USERGRID-593 Commit: b144cc2bc6f77d35286bad3bcea8159f17cc4fc5 Parents: 9f6fa27 Author: Todd Nine <tn...@apigee.com> Authored: Thu Apr 23 19:07:34 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Thu Apr 23 19:07:34 2015 -0600 ---------------------------------------------------------------------- stack/core/pom.xml | 12 -- .../corepersistence/CpEntityManagerFactory.java | 2 +- .../corepersistence/CpRelationManager.java | 9 +- .../usergrid/corepersistence/CpWalker.java | 4 +- .../corepersistence/command/CommandBuilder.java | 60 ++++++- .../command/cursor/CursorCache.java | 37 ----- .../command/cursor/CursorSerializer.java | 86 ++++++++++ .../command/cursor/RequestCursor.java | 45 ++++++ .../command/cursor/ResponseCursor.java | 56 +++++++ .../command/read/AbstractCommand.java | 111 +++++++++++++ .../command/read/CollectCommand.java | 6 + .../corepersistence/command/read/Command.java | 28 +++- .../command/read/entity/EntityLoadCommand.java | 158 +++++++++++++++++++ .../read/graph/AbstractReadGraphCommand.java | 101 ++++++++++++ .../read/graph/ReadGraphCollectionCommand.java | 53 +++++++ .../read/graph/ReadGraphConnectionCommand.java | 49 ++++++ .../usergrid/corepersistence/cursor/Cursor.java | 42 ----- .../corepersistence/cursor/CursorBuilder.java | 28 ---- .../cursor/CursorSerializer.java | 112 ------------- .../corepersistence/graph/GraphCursor.java | 65 -------- .../corepersistence/graph/GraphOperations.java | 66 -------- .../CollectionResultsLoaderFactoryImpl.java | 1 - .../results/GraphQueryExecutor.java | 152 ++++++++++++++++++ .../rx/ApplicationObservable.java | 42 ++--- .../rx/EdgesFromSourceObservable.java | 4 +- .../rx/EdgesToTargetObservable.java | 4 +- .../persistence/collection/EntitySet.java | 14 +- .../serialization/impl/EntitySetImpl.java | 10 +- .../persistence/core/rx/OrderedMerge.java | 28 ++-- .../graph/impl/SimpleSearchByEdgeType.java | 26 ++- .../impl/stage/NodeDeleteListenerImpl.java | 5 +- .../impl/shard/DirectedEdgeMeta.java | 7 +- .../persistence/graph/GraphManagerLoadTest.java | 6 +- .../graph/GraphManagerShardConsistencyIT.java | 3 +- .../graph/GraphManagerStressTest.java | 8 +- stack/corepersistence/pom.xml | 2 +- .../scenarios/ConnectionScenarios.scala | 2 +- stack/pom.xml | 1 - 38 files changed, 1005 insertions(+), 440 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index d91208d..f12c0b8 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -473,18 +473,6 @@ </dependency> <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-core</artifactId> - <version>${rx.version}</version> - </dependency> - - <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-math</artifactId> - <version>${rx.version}</version> - </dependency> - - <dependency> <groupId>com.clearspring.analytics</groupId> <artifactId>stream</artifactId> <version>2.7.0</version> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index fe4d828..b12b6ce 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -427,7 +427,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( fromEntityId, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null )); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() )); Iterator<Edge> iter = edges.toBlockingObservable().getIterator(); while ( iter.hasNext() ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index da39ea9..2ee136e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -106,6 +106,7 @@ import org.apache.usergrid.utils.MapUtils; import org.apache.usergrid.utils.UUIDUtils; import com.codahale.metrics.Timer; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import me.prettyprint.hector.api.Keyspace; @@ -331,7 +332,7 @@ public class CpRelationManager implements RelationManager { public Observable<Edge> call( final String edgeType ) { return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); @@ -400,7 +401,7 @@ public class CpRelationManager implements RelationManager { public Observable<Edge> call( final String etype ) { return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ) @@ -516,7 +517,7 @@ public class CpRelationManager implements RelationManager { CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ), System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, - null ) ); // last + Optional.<Edge>absent() ) ); // last Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator(); int count = 0; @@ -541,7 +542,7 @@ public class CpRelationManager implements RelationManager { CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ), System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, - null ) ); // last + Optional.<Edge>absent() ) ); // last int count = edgesFromSource.take( 2 ).count().toBlocking().last(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java index 4b902d8..b2354a6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java @@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Action1; import rx.functions.Func1; @@ -111,7 +113,7 @@ public class CpWalker { logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId ); return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( - applicationId, edgeType, Long.MAX_VALUE, order , null ) ); + applicationId, edgeType, Long.MAX_VALUE, order , Optional.<Edge>absent() ) ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java index f40c1d5..1d66d3a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java @@ -20,9 +20,17 @@ package org.apache.usergrid.corepersistence.command; +import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.command.read.Command; import org.apache.usergrid.corepersistence.command.read.TraverseCommand; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; /** @@ -32,24 +40,66 @@ public class CommandBuilder { private final ApplicationScope applicationScope; + private final RequestCursor requestCursor; + private final ResponseCursor responseCursor; + + private int count = 0; + + private Observable<Id> currentObservable; + + + /** + * Our first pass, where we implement our start point as an Id until we can use this to perform our entire + * traversal. Eventually as we untangle the existing Query service nightmare, the sourceId will be remove and should + * only be traversed from the root application + */ + public CommandBuilder(final ApplicationScope applicationScope, final Id sourceId, final Optional<String> requestCursor ) { + + this.applicationScope = applicationScope; + + //set the request cursor + this.requestCursor = new RequestCursor( requestCursor ); + + //set the response cursor + this.responseCursor = new ResponseCursor(); - public CommandBuilder( final ApplicationScope applicationScope ) {this.applicationScope = applicationScope;} + this.currentObservable = Observable.just( sourceId ); + } /** * Add a read command that will read Ids and produce Ids. This is an intermediate traversal operations - * @param traverseCommand - * @return */ - public CommandBuilder withTraverseCommand(final TraverseCommand traverseCommand ){ + public CommandBuilder withTraverseCommand( final TraverseCommand traverseCommand ) { + + setState( traverseCommand ); + + this.currentObservable = currentObservable.compose( traverseCommand ); return this; } - public <T> T build(final CollectCommand<T> collectCommand ){ + /** + * Build the final collection step, and + */ + public <T> Observable<T> build( final CollectCommand<T> collectCommand ) { + setState( collectCommand ); + + return currentObservable.compose( collectCommand ); } + /** + * Set the id of the state + * @param command + */ + private void setState( Command<?> command ) { + command.setId( count ); + //done for clarity + count++; + + command.setCursorCaches( requestCursor, responseCursor ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java deleted file mode 100644 index 4080958..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.command.cursor; - - -import org.apache.usergrid.corepersistence.cursor.Cursor; - - -public class CursorCache { - - - /** - * Get the cursor with the specified id - * @param id - * @return - */ - public Cursor getCursor(final String id){ - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java new file mode 100644 index 0000000..b45e7da --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.cursor; + + +import java.io.Serializable; + +import com.fasterxml.jackson.core.Base64Variant; +import com.fasterxml.jackson.core.Base64Variants; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; + + +/** + * A utility to serialize objects to/from cursors + */ +public class CursorSerializer { + + + private static final SmileFactory SMILE_FACTORY = new SmileFactory(); + + private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); + + private static final Base64Variant VARIANT = Base64Variants.MODIFIED_FOR_URL; + + + /** + * Serialize the serializable object as a cursor + */ + public static String asCursor( final Serializable cursor ) { + + try { + return MAPPER.writer( VARIANT ).writeValueAsString( cursor ); + } + catch ( JsonProcessingException e ) { + throw new CursorParseException( "Unable to serialize cursor", e ); + } + } + + + /** + * Deserialize from the cursor + * @param cursor + * @return + * @throws CursorParseException + */ + public <T extends Serializable> T fromCursor( final String cursor, final Class<T> cursorClass ) throws CursorParseException { + try { + + final JsonParser parser = MAPPER.getFactory().createParser( cursor ); + return MAPPER.reader( VARIANT ).readValue( parser, cursorClass); + } + catch ( Exception e ) { + throw new CursorParseException( "Unable to serialize cursor", e ); + } + } + + + /** + * Thrown when we can't parse a cursor + */ + public static class CursorParseException extends RuntimeException { + public CursorParseException( final String message, final Throwable cause ) { + super( message, cause ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java new file mode 100644 index 0000000..60d54ff --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.cursor; + + +import java.io.Serializable; + +import com.google.common.base.Optional; + + +/** + * A cursor that has been passed in with our request. Adds utils for parsing values + */ +public class RequestCursor { + + public RequestCursor(final Optional<String> cursor){ + + } + + + /** + * Get the cursor with the specified id + */ + public <T extends Serializable> T getCursor( final int id, final Class<T> cursorType ) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java new file mode 100644 index 0000000..02aae34 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.cursor; + + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + + +/** + * A cursor used in rendering a response + */ +public class ResponseCursor { + + + + /** + * We use a map b/c some indexes might be skipped + */ + private Map<Integer, ? super Serializable> cursors = new HashMap<>(); + + /** + * Set the possible cursor value into the index. DOES NOT parse the cursor. This is intentional for performance + */ + public <T extends Serializable> void setCursor( final int id, final T cursor ) { + cursors.put( id, cursor ); + } + + + private void ensureCapacity() { + + } + + + public String encodeAsString() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java new file mode 100644 index 0000000..59e1848 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +import java.io.Serializable; + +import javax.xml.ws.Response; + +import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +/** + * Basic functionality for our commands to handle cursor IO + */ +public abstract class AbstractCommand<T, C extends Serializable> implements Command<T> { + + private int id; + /** + * The cache of the cursor that was set when the read was started + */ + private RequestCursor readCache; + + /** + * The current state of the write cache. Gets updated as we traverse the observables + */ + private ResponseCursor writeCache; + + + /** + * The applicationScope + */ + protected ApplicationScope applicationScope; + + + @Override + public void setId( final int id ) { + this.id = id; + } + + + @Override + public void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ) { + this.readCache = readCache; + this.writeCache = writeCache; + } + + + @Override + public void setApplicationScope( final ApplicationScope applicationScope ) { + this.applicationScope = applicationScope; + } + + + /** + * Return the parsed value of the cursor from the last request, if it exists + */ + protected Optional<C> getCursor() { + final C cursor = readCache.getCursor( id, getCursorClass() ); + + return Optional.fromNullable( cursor ); + } + + + + + + /** + * Set the cursor value into the new cursor write cache + * @param newValue + */ + protected void setCursor(final C newValue){ + writeCache.setCursor( id, newValue ); + } + + + /** + * Generate our state as a cursor + * @return + */ + protected String generateCursor(){ + return writeCache.encodeAsString(); + } + + /** + * Return the class to be used when parsing the cursor + */ + protected abstract Class<C> getCursorClass(); + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java index 923cef9..5e982b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java @@ -26,5 +26,11 @@ package org.apache.usergrid.corepersistence.command.read; */ public interface CollectCommand<T> extends Command<T>{ + /** + * Set the prefered result size for the command + * @param resultSize + */ + void setResultSize(final int resultSize); + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java index 1b8110d..ada47d2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java @@ -20,25 +20,37 @@ package org.apache.usergrid.corepersistence.command.read; -import org.apache.commons.collections4.Transformer; - -import org.apache.usergrid.corepersistence.command.cursor.CursorCache; -import org.apache.usergrid.corepersistence.cursor.Cursor; +import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; +import rx.Observable; + /** - * Interface for a read command. This takes an input of Id, performs some operation, and emits - * Id for further processing + * Interface for a read command. This takes an input of Id, performs some operation, and emits Id for further + * processing */ -public interface Command<T> extends Transformer<Id, T> { +public interface Command<T> extends Observable.Transformer<Id, T> { /** + * Set the id of this command in it's execution environment + */ + void setId( final int id ); + + /** * Set the cursor cache into the command + * * @param readCache Set the cache that was used in the request * @param writeCache Set the cache to be used when writing the results */ - void setCursorCaches( final CursorCache readCache, final CursorCache writeCache ); + void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ); + /** + * Set the application scope of the command + * @param applicationScope + */ + void setApplicationScope(final ApplicationScope applicationScope); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java new file mode 100644 index 0000000..3b6cade --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.entity; + + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.functions.Func1; + + +/** + * Loads entities from a set of Ids. + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadCommand extends AbstractCommand<Results, Serializable> implements CollectCommand<Results> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + //TODO get rid of this when merged into 2.0 dev + private final CollectionScope collectionScope; + private int resultSize; + + + public EntityLoadCommand( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final CollectionScope collectionScope ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.collectionScope = collectionScope; + } + + + @Override + protected Class<Serializable> getCursorClass() { + return null; + } + + + @Override + public Observable<Results> call( final Observable<? extends Id> observable ) { + + final EntityCollectionManager ecm = + entityCollectionManagerFactory.createCollectionManager( this.collectionScope ); + + return observable.buffer( resultSize ).flatMap( new Func1<List<? extends Id>, Observable<Results>>() { + @Override + public Observable<Results> call( final List<? extends Id> ids ) { + + //load the entities + final Observable<EntitySet> entities = ecm.load( ( Collection<Id> ) ids ); + + + return entities.flatMap( new Func1<EntitySet, Observable<Results>>() { + @Override + public Observable<Results> call( final EntitySet entitySet ) { + return createResults( entitySet ); + } + } ); + } + } ); + } + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + public Observable<Results> createResults( final EntitySet entitySet ) { + + + return Observable.from( entitySet.getEntities() ).map( + new Func1<MvccEntity, org.apache.usergrid.persistence.Entity>() { + + @Override + public org.apache.usergrid.persistence.Entity call( final MvccEntity mvccEntity ) { + return mapEntity( mvccEntity ); + } + } ) + //filter null entities + .filter( new Func1<org.apache.usergrid.persistence.Entity, Boolean>() { + @Override + public Boolean call( final org.apache.usergrid.persistence.Entity entity ) { + return entity == null; + } + } ) + //buffer them and put them in as a map + .toList().map( new Func1<List<org.apache.usergrid.persistence.Entity>, Results>() { + @Override + public Results call( final List<org.apache.usergrid.persistence.Entity> entities ) { + final Results results = Results.fromEntities( entities ); + results.setCursor( generateCursor() ); + + return results; + } + } ); + } + + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { + if ( !mvccEntity.getEntity().isPresent() ) { + return null; + } + + + final Entity cpEntity = mvccEntity.getEntity().get(); + final Id entityId = cpEntity.getId(); + + org.apache.usergrid.persistence.Entity entity = + EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + + return entity; + } + + + @Override + public void setResultSize( final int resultSize ) { + this.resultSize = resultSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java new file mode 100644 index 0000000..f637510 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.graph; + + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.TraverseCommand; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; +import rx.functions.Func1; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; + + +/** + * Command for reading graph edges + */ +public abstract class AbstractReadGraphCommand extends AbstractCommand<Id, Edge> implements TraverseCommand { + + private final GraphManagerFactory graphManagerFactory; + + + /** + * Create a new instance of our command + * @param graphManagerFactory + */ + public AbstractReadGraphCommand( final GraphManagerFactory graphManagerFactory ) { + this.graphManagerFactory = graphManagerFactory; + } + + + @Override + public Observable<Id> call( final Observable<? extends Id> observable ) { + + //get the graph manager + final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + + //set our our constant state + final Optional<Edge> startFromCursor = getCursor(); + + final String edgeName = getEdgeTypeName(); + + + //return all ids that are emitted from this edge + return observable.flatMap( new Func1<Id, Observable<Id>>() { + + @Override + public Observable<Id> call( final Id id ) { + + final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, startFromCursor ); + + return graphManager.loadEdgesFromSource( search ).map( new Func1<Edge, Id>() { + @Override + public Id call( final Edge edge ) { + return edge.getTargetNode(); + } + } ); + } + } ); + } + + + @Override + protected Class<Edge> getCursorClass() { + return Edge.class; + } + + + + /** + * Get the edge type name we should use when traversing + * @return + */ + protected abstract String getEdgeTypeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java new file mode 100644 index 0000000..aec0d8b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.graph; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; + + +/** + * Command for reading graph edges on a collection + */ +public class ReadGraphCollectionCommand extends AbstractReadGraphCommand{ + + private final String collectionName; + + + /** + * Create a new instance of our command + * @param graphManagerFactory + * @param collectionName + */ + public ReadGraphCollectionCommand( final GraphManagerFactory graphManagerFactory, final String collectionName ) { + super(graphManagerFactory); + this.collectionName = collectionName; + } + + + + + @Override + protected String getEdgeTypeName() { + return getCollectionScopeNameFromCollectionName(collectionName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java new file mode 100644 index 0000000..adebd45 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.graph; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; + + +/** + * Command for reading graph edges on a connection + */ +public class ReadGraphConnectionCommand extends AbstractReadGraphCommand { + + private final String connectionName; + + + /** + * Create a new instance of our command + */ + public ReadGraphConnectionCommand( final GraphManagerFactory graphManagerFactory, final String connectionName ) { + super( graphManagerFactory ); + this.connectionName = connectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getConnectionScopeName( connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java deleted file mode 100644 index 4ec8d81..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.cursor; - - -import java.io.Serializable; - - -/** - * Interface for cursors - */ -public interface Cursor<T extends Serializable> { - - /** - * Get the type of the cursor - * @return - */ - String getId(); - - /** - * Return the cursor to be used for serialization - * @return - */ - T getCursor(); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java deleted file mode 100644 index 26ca5a3..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.cursor; - - -public class CursorBuilder { - - public static <T> T getCursor(final String cursor, Class<T> cursorClass){ - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java deleted file mode 100644 index 0cf9fa0..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.cursor; - - -import java.io.Serializable; - -import com.fasterxml.jackson.core.Base64Variant; -import com.fasterxml.jackson.core.Base64Variants; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; - - -/** - * A utility to serialize objects to/from cursors - */ -public class CursorSerializer { - - - private static final SmileFactory SMILE_FACTORY = new SmileFactory(); - - private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); - - private static final Base64Variant VARIANT = Base64Variants.MODIFIED_FOR_URL; - - - /** - * Serialize the serializable object as a cursor - */ - public static String asCursor( final Cursor<?> cursor ) { - - final SerializableCursor serializableCursor = new SerializableCursor( cursor.getType(), cursor.getCursor() ); - - try { - return MAPPER.writer( VARIANT ).writeValueAsString( serializableCursor ); - } - catch ( JsonProcessingException e ) { - throw new CursorParseException( "Unable to serialize cursor", e ); - } - } - - - /** - * Deserialize from the cursor - * @param cursor - * @return - * @throws CursorParseException - */ - public SerializableCursor fromCursor( final String cursor ) throws CursorParseException { - try { - - final JsonParser parser = MAPPER.getFactory().createParser( cursor ); - return MAPPER.reader( VARIANT ).readValue( parser, SerializableCursor.class ); - } - catch ( Exception e ) { - throw new CursorParseException( "Unable to serialize cursor", e ); - } - } - - - /** - * The cursor as a serialized value - */ - public static class SerializableCursor{ - private final String type; - - private final Serializable value; - - public SerializableCursor( final String type, final Serializable value ) { - this.type = type; - this.value = value; - } - - - public String getType() { - return type; - } - - - public Serializable getValue() { - return value; - } - } - - /** - * Thrown when we can't parse a cursor - */ - public static class CursorParseException extends RuntimeException { - public CursorParseException( final String message, final Throwable cause ) { - super( message, cause ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java deleted file mode 100644 index 20d55ba..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.graph; - - -import org.apache.usergrid.persistence.graph.Edge; - -import com.google.common.base.Optional; - - -/** - * Our cursor implementation for graph - */ -public class GraphCursor { - - private Optional<Edge> lastEdge; - - - /** - * Create the graph from the cursor. This operation is null save, the cursor can be null for undefined - * @param cursor - */ - public GraphCursor(final Optional<String> cursor){ - if(!cursor.isPresent()){ - lastEdge = Optional.absent(); - return; - } - - - - } - - - - private String asCursorString(){ - - final String - return null; - } - - private void parseCursorString(final String cursor){ - - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java deleted file mode 100644 index 598b6bb..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.graph; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; - - -/** - * Operations we perform on a graph - */ -public interface GraphOperations { - - - /** - * Write a collection edge and return the observable of the operation - */ - Observable<Edge> writeCollectionEdge( final ApplicationScope applicationScope, final String collectionName, - final Id entityId ); - - /** - * Write the connection edge from the source to the target - */ - Observable<Edge> writeConnectionEdge( final ApplicationScope applicationScope, final String connectionEdge, - final Id sourceId, final Id entityId ); - - - /** - * Read all collection edges in the order specified for the collection name and the application scope - */ - Observable<Edge> readCollectionEdges( final ApplicationScope applicationScope, final String collectionName, - final Order order, final GraphCursor cursor ); - - /** - * Read the - */ - Observable<Edge> readConnectionEdges( final ApplicationScope applicationScope, final String connectionName, - final Id sourceId, final Order order, final GraphCursor graphCursor ); - - - enum Order { - ASCENDING, - DESCENDING; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java index b79700b..4b43142 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java @@ -48,7 +48,6 @@ public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory verifier = new CollectionRefsVerifier(); } else if ( resultsLevel == Query.Level.IDS ) { -// verifier = new RefsVerifier(); verifier = new IdsVerifier(); } else { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java new file mode 100644 index 0000000..df1a57a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.command.CommandBuilder; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * This class is a nasty hack to bridge 2.0 observables into 1.0 iterators + * DO NOT use this as a model for moving forward, pandas will die. + */ +public class GraphQueryExecutor implements QueryExecutor { + + private static final Logger logger = LoggerFactory.getLogger( GraphQueryExecutor.class ); + + + private final ApplicationScope applicationScope; + + private final Id sourceId; + + private final String connectionName; + + + private Iterator<Results> observableIterator; + + + public GraphQueryExecutor(final ApplicationScope appScope, final EntityRef source, final String connectionName ) { + this.applicationScope = appScope; + this.sourceId = + + } + + + @Override + public Iterator<Results> iterator() { + return this; + } + + + private void loadNextPage() { + + } + + + private void build(){ + CommandBuilder commandBuilder = new CommandBuilder( ); + } + + /** + * Build results from a set of candidates, and discard those that represent stale indexes. + * + * @param query Query that was executed + * @param crs Candidates to be considered for results + */ + private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs ) { + + logger.debug( "buildResults() from {} candidates", crs.size() ); + + //get an instance of our results loader + final ResultsLoader resultsLoader = + this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() ); + + //load the results + final Results results = resultsLoader.loadResults( crs ); + + //signal for post processing + resultsLoader.postProcess(); + + + results.setCursor( crs.getCursor() ); + + //ugly and tight coupling, but we don't have a choice until we finish some refactoring + results.setQueryExecutor( this ); + + logger.debug( "Returning results size {}", results.size() ); + + return results; + } + + + @Override + public boolean hasNext() { + + //we've tried to load and it's empty and we have more to load, load the next page + if ( currentResults == null ) { + //there's nothing left to load, nothing to do + if ( !moreToLoad ) { + return false; + } + + //load the page + + loadNextPage(); + } + + + //see if our current results are not null + return currentResults != null; + } + + + @Override + public Results next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more results present" ); + } + + final Results toReturn = currentResults; + + currentResults = null; + + return toReturn; + } + + @Override + public void remove() { + throw new RuntimeException("Remove not implemented!!"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java index 6019bca..988cd3b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java @@ -39,6 +39,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -89,37 +91,37 @@ public class ApplicationObservable { //we have app infos. For each of these app infos, we have to load the application itself Observable<Id> appIds = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - null ) ).flatMap( new Func1<Edge, Observable<Id>>() { + Optional.<Edge>absent() ) ).flatMap( new Func1<Edge, Observable<Id>>() { @Override public Observable<Id> call( final Edge edge ) { //get the app info and load it final Id appInfo = edge.getTargetNode(); return collectionManager.load( appInfo ) - //filter out null entities - .filter( new Func1<Entity, Boolean>() { - @Override - public Boolean call( final Entity entity ) { - if ( entity == null ) { - logger.warn( "Encountered a null application info for id {}", appInfo ); - return false; - } - - return true; + //filter out null entities + .filter( new Func1<Entity, Boolean>() { + @Override + public Boolean call( final Entity entity ) { + if ( entity == null ) { + logger.warn( "Encountered a null application info for id {}", appInfo ); + return false; } - } ) - //get the id from the entity - .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() { + return true; + } + } ) + //get the id from the entity + .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() { - @Override - public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) { - final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue(); + @Override + public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) { - return CpNamingUtils.generateApplicationId( uuid ); - } - } ); + final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue(); + + return CpNamingUtils.generateApplicationId( uuid ); + } + } ); } } ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java index d3e2ee5..b23886b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java @@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -56,7 +58,7 @@ public class EdgesFromSourceObservable { logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode ); return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java index c5dc54d..3130a72 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java @@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -56,7 +58,7 @@ public class EdgesToTargetObservable { logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode); return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java index 35b6a12..ffdfc2a 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java @@ -18,6 +18,8 @@ */ package org.apache.usergrid.persistence.collection; +import java.util.Collection; + import org.apache.usergrid.persistence.model.entity.Id; @@ -31,17 +33,23 @@ public interface EntitySet { * @param entityId * @return */ - public MvccEntity getEntity(Id entityId); + MvccEntity getEntity( Id entityId ); + + /** + * Get all entities in the set + * @return + */ + Collection<MvccEntity> getEntities(); /** * Get the number of entities in this set * @return */ - public int size(); + int size(); /** * Return true if the set is empty * @return */ - public boolean isEmpty(); + boolean isEmpty(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java index 921093b..126f9f3 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java @@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl;/* */ +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -32,8 +33,7 @@ public class EntitySetImpl implements EntitySet { private final Map<Id, MvccEntity> entities; - public EntitySetImpl( - final int expectedSize ) { + public EntitySetImpl(final int expectedSize ) { this.entities = new HashMap<>( expectedSize ); } @@ -48,6 +48,12 @@ public class EntitySetImpl implements EntitySet { return entities.get( entityId ); } + public Collection<MvccEntity> getEntities(){ + return entities.values(); + } + + + @Override public int size() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java index cdad0d1..613585e 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java @@ -94,20 +94,20 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> { innerObservers[i] = inner; } - - /** - * Once we're set up, begin the subscription to sub observables - */ - for ( int i = 0; i < observables.length; i++ ) { - //subscribe after setting them up - //add our subscription to the composite for future cancellation - Subscription subscription = observables[i].subscribe( innerObservers[i] ); - - csub.add( subscription ); - - //add the internal composite subscription - outerOperation.add( csub ); - } +// TODO, if this merge makes it into 2.0-dev remove this and use 2.0 -dev +// /** +// * Once we're set up, begin the subscription to sub observables +// */ +// for ( int i = 0; i < observables.length; i++ ) { +// //subscribe after setting them up +// //add our subscription to the composite for future cancellation +// Subscription subscription = observables[i].subscribe( innerObservers[i] ); +// +// csub.add( subscription ); +// +// //add the internal composite subscription +// outerOperation.add( csub ); +// } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java index 6bc8b1b..1687162 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java @@ -51,21 +51,39 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ * @param order The order order. Descending is most efficient * @param last The value to start seeking from. Must be >= this value * @param order + * + * //TODO, make last an optional */ - public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last - ) { + public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last ) { + this(node, type, maxTimestamp, order, Optional.fromNullable(last)); + } + + + /** + * Create the search modules + * @param node The node to search from + * @param type The edge type + * @param maxTimestamp The maximum timestamp to return + * @param order The order order. Descending is most efficient + * @param last The value to start seeking from. Must be >= this value + * @param order + * + * //TODO, make last an optional + */ + public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Optional<Edge> last ) { Preconditions.checkNotNull( order, "order is required"); - ValidationUtils.verifyIdentity(node); + ValidationUtils.verifyIdentity( node ); ValidationUtils.verifyString( type, "type" ); GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" ); + Preconditions.checkNotNull( last, "last is required" ); this.node = node; this.type = type; this.maxTimestamp = maxTimestamp; this.order = order; - this.last = Optional.fromNullable(last); + this.last = last; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java index f167f0c..03f1df8 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java @@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra; import org.apache.usergrid.persistence.core.rx.ObservableIterator; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; @@ -160,7 +161,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener { @Override protected Iterator<MarkedEdge> getIterator() { return storageSerialization.getEdgesToTarget( scope, - new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) ); + new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); } @@ -177,7 +178,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener { @Override protected Iterator<MarkedEdge> getIterator() { return storageSerialization.getEdgesFromSource( scope, - new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) ); + new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java index 6bb467f..6bf6178 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.UUID; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.SearchByIdType; @@ -39,6 +40,7 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; import com.netflix.astyanax.MutationBatch; @@ -272,7 +274,8 @@ public abstract class DirectedEdgeMeta { final Id sourceId = nodes[0].id; final String edgeType = types[0]; - final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, order, null); + final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, order, + Optional.<Edge>absent()); return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search, shards ); } @@ -392,7 +395,7 @@ public abstract class DirectedEdgeMeta { final Id targetId = nodes[0].id; final String edgeType = types[0]; - final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, order, null); + final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, order, Optional.<Edge>absent()); return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search, shards ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java index d3bf24e..c91fe89 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java @@ -48,6 +48,7 @@ import org.apache.usergrid.persistence.graph.guice.TestGraphModule; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; import com.google.inject.Inject; import rx.Observable; @@ -112,7 +113,8 @@ public class GraphManagerLoadTest { @Override public Observable<Edge> doSearch( final GraphManager manager ) { - return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null) ); + return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional + .<Edge>absent()) ); } }; @@ -139,7 +141,7 @@ public class GraphManagerLoadTest { @Override public Observable<Edge> doSearch( final GraphManager manager ) { - return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) ); + return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } }; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 095f855..d702556 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -60,6 +60,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Slf4jReporter; +import com.google.common.base.Optional; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -166,7 +167,7 @@ public class GraphManagerShardConsistencyIT { public Observable<Edge> doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } };