Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd983d66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd983d66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd983d66 Branch: refs/heads/two-dot-o-dev Commit: cd983d66260222985431a775454183c2ed2305ea Parents: 6d4847a Author: Todd Nine <[email protected]> Authored: Thu Apr 30 17:40:52 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Thu Apr 30 17:40:52 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 5 +- .../corepersistence/pipeline/Pipeline.java | 9 +- .../pipeline/PipelineContext.java | 16 +- .../pipeline/PipelineOperation.java | 39 ++++ .../pipeline/PipelineResult.java | 57 ----- .../pipeline/cursor/ResponseCursor.java | 81 ++++--- .../pipeline/read/AbstractFilter.java | 45 ++++ .../pipeline/read/AbstractPathFilter.java | 109 +++++++++ .../read/AbstractPipelineOperation.java | 44 ---- .../pipeline/read/AbstractSeekingFilter.java | 102 -------- .../pipeline/read/CandidateResultsFilter.java | 31 --- .../pipeline/read/Collector.java | 13 +- .../pipeline/read/CollectorFactory.java | 12 +- .../corepersistence/pipeline/read/EdgePath.java | 79 +++++++ .../corepersistence/pipeline/read/Filter.java | 9 +- .../pipeline/read/FilterFactory.java | 31 ++- .../pipeline/read/FilterResult.java | 56 +++++ .../pipeline/read/PipelineOperation.java | 38 --- .../pipeline/read/ReadPipelineBuilder.java | 5 +- .../pipeline/read/ReadPipelineBuilderImpl.java | 75 +++--- .../pipeline/read/ResultsPage.java | 26 ++- .../read/collect/AbstractCollector.java | 46 ++++ .../read/collect/ResultsPageCollector.java | 80 +++++++ .../AbstractElasticSearchFilter.java | 47 ++-- .../pipeline/read/elasticsearch/Candidate.java | 55 +++++ .../elasticsearch/CandidateEntityFilter.java | 234 +++++++++++++++++++ .../read/elasticsearch/CandidateIdFilter.java | 201 ++++++++++++++++ .../CandidateResultsEntityResultsCollector.java | 217 ----------------- .../CandidateResultsIdVerifyFilter.java | 193 --------------- .../impl/CollectionRefsVerifier.java | 44 ---- .../CollectionResultsLoaderFactoryImpl.java | 65 ------ .../impl/ConnectionRefsVerifier.java | 59 ----- .../ConnectionResultsLoaderFactoryImpl.java | 73 ------ .../impl/ElasticSearchQueryExecutor.java | 224 ------------------ .../read/elasticsearch/impl/EntityVerifier.java | 127 ---------- .../elasticsearch/impl/FilteringLoader.java | 219 ----------------- .../read/elasticsearch/impl/IdsVerifier.java | 46 ---- .../read/elasticsearch/impl/ResultsLoader.java | 43 ---- .../impl/ResultsLoaderFactory.java | 41 ---- .../elasticsearch/impl/ResultsVerifier.java | 52 ----- .../elasticsearch/impl/VersionVerifier.java | 85 ------- .../pipeline/read/entity/EntityIdFilter.java | 53 ----- .../read/entity/EntityLoadCollector.java | 94 -------- .../graph/AbstractReadGraphEdgeByIdFilter.java | 12 +- .../read/graph/AbstractReadGraphFilter.java | 15 +- .../pipeline/read/graph/EntityIdFilter.java | 54 +++++ .../pipeline/read/graph/EntityLoadFilter.java | 155 ++++++++++++ .../graph/ReadGraphConnectionByTypeFilter.java | 20 +- .../results/ObservableQueryExecutor.java | 24 +- .../pipeline/cursor/CursorTest.java | 20 +- .../persistence/index/CandidateResults.java | 11 +- .../impl/EsApplicationEntityIndexImpl.java | 7 +- 52 files changed, 1402 insertions(+), 2096 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 3119934..2790ee1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -32,7 +32,6 @@ import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; -import org.apache.usergrid.corepersistence.pipeline.PipelineResult; import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; @@ -648,7 +647,7 @@ public class CpRelationManager implements RelationManager { } - final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute(); + final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute(); return new ObservableQueryExecutor( resultsObservable ).next(); } @@ -917,7 +916,7 @@ public class CpRelationManager implements RelationManager { } - final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute(); + final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute(); return new ObservableQueryExecutor( resultsObservable ).next(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java index bc93b6c..df6a218 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; import org.apache.usergrid.corepersistence.pipeline.read.Collector; -import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import com.google.common.base.Optional; @@ -47,7 +46,6 @@ public class Pipeline<R> { private final List<PipelineOperation> idPipelineOperationList; private final Collector<?, R> collector; private final RequestCursor requestCursor; - private final ResponseCursor responseCursor; private final int limit; @@ -69,7 +67,6 @@ public class Pipeline<R> { this.limit = limit; this.requestCursor = new RequestCursor( cursor ); - this.responseCursor = new ResponseCursor(); } @@ -77,7 +74,7 @@ public class Pipeline<R> { * Execute the pipline construction, returning an observable of results * @return */ - public Observable<PipelineResult<R>> execute(){ + public Observable<R> execute(){ Observable traverseObservable = Observable.just( applicationScope.getApplication() ); @@ -99,7 +96,7 @@ public class Pipeline<R> { //append the optional cursor into the response for the caller to use - return response.map( result -> new PipelineResult<>( result, responseCursor ) ); + return response; } @@ -111,7 +108,7 @@ public class Pipeline<R> { private void setState( final PipelineOperation pipelineOperation ) { - final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor, + final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); pipelineOperation.setContext( context ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java index 325f876..018abb7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java @@ -38,16 +38,13 @@ public class PipelineContext { private final int id; private final ApplicationScope applicationScope; private final RequestCursor requestCursor; - private final ResponseCursor responseCursor; private final int limit; - public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, - final ResponseCursor responseCursor, final int limit, final int id ) { + public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) { this.applicationScope = applicationScope; this.requestCursor = requestCursor; - this.responseCursor = responseCursor; this.limit = limit; this.id = id; } @@ -64,7 +61,7 @@ public class PipelineContext { /** - * Get our cursor value if present + * Get our cursor value if present from our pipline * @param serializer */ public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) { @@ -73,15 +70,6 @@ public class PipelineContext { return Optional.fromNullable( value ); } - - /** - * Set the cursor value into our resposne - */ - public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) { - responseCursor.setCursor( id, value, serializer ); - } - - /** * Get the limit for this execution * @return http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java new file mode 100644 index 0000000..d2fa16c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; + +import rx.Observable; + + +/** + * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses. + * This takes an input of Id, performs some operation, and emits values for further processing in the Observable + * pipeline + * @param <T> The input type of the filter value + * @param <R> The output type of the filter value + */ +public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> { + + void setContext(final PipelineContext pipelineContext); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java deleted file mode 100644 index fe8604e..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; - -import com.google.common.base.Optional; - - -/** - * Intermediate observable that will return results, as well as an optional cursor - * @param <R> - */ -public class PipelineResult<R> { - - - private final R result; - - private final ResponseCursor responseCursor; - - - public PipelineResult( final R result, final ResponseCursor responseCursor ) { - this.result = result; - this.responseCursor = responseCursor; - } - - - /** - * If the user requests our cursor, return the cursor - * @return - */ - public Optional<String> getCursor(){ - return this.responseCursor.encodeAsString(); - } - - public R getResult(){ - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java index f1c8c24..dbd8b88 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java @@ -20,12 +20,10 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; -import java.io.Serializable; import java.util.Base64; -import java.util.HashMap; -import java.util.Map; -import com.fasterxml.jackson.core.Base64Variant; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -41,71 +39,72 @@ public class ResponseCursor { private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper(); + /** - * We use a map b/c some indexes might be skipped + * The pointer to the first edge path. Evaluation is lazily performed in the case the caller does not care about + * the cursor. */ - private Map<Integer, CursorEntry<?>> cursors = new HashMap<>(); + private final Optional<EdgePath> edgePath; + private Optional<String> encodedValue = null; - /** - * Set the possible cursor value into the index. DOES NOT parse the cursor. This is intentional for performance - */ - public <T extends Serializable> void setCursor( final int id, final T cursor, - final CursorSerializer<T> serializer ) { - final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer ); - cursors.put( id, newEntry ); - } + public ResponseCursor( final Optional<EdgePath> edgePath ) {this.edgePath = edgePath;} /** - * now we're done, encode as a string + * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to take the + * time to calculate it */ public Optional<String> encodeAsString() { - try { - if(cursors.isEmpty()){ - return Optional.absent(); - } + //always return cached if we are called 2x + if ( encodedValue != null ) { + return encodedValue; + } + + if ( !edgePath.isPresent() ) { + encodedValue = Optional.absent(); + return encodedValue; + } + + + try { + //no edge path, short circuit final ObjectNode map = MAPPER.createObjectNode(); - for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) { - final CursorEntry cursorEntry = entry.getValue(); + Optional<EdgePath> current = edgePath; - final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor ); - map.put( entry.getKey().toString(), serialized ); - } + //traverse each edge and add them to our json + do { + + final EdgePath edgePath = current.get(); + final Object cursorValue = edgePath.getCursorValue(); + final CursorSerializer serializer = edgePath.getSerializer(); + final int filterId = edgePath.getFilterId(); + + final JsonNode serialized = serializer.toNode( MAPPER, cursorValue ); + map.put( String.valueOf( filterId ), serialized ); + current = current.get().getPrevious(); + } + while ( current.isPresent() ); - final byte[] output = MAPPER.writeValueAsBytes(map); + final byte[] output = MAPPER.writeValueAsBytes( map ); //generate a base64 url save string final String value = Base64.getUrlEncoder().encodeToString( output ); - return Optional.of( value ); - + encodedValue = Optional.of( value ); } catch ( JsonProcessingException e ) { throw new CursorParseException( "Unable to serialize cursor", e ); } - } - - /** - * Interal pointer to the cursor and it's serialzed value - */ - private static final class CursorEntry<T> { - private final T cursor; - private final CursorSerializer<T> serializer; - - - private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) { - this.cursor = cursor; - this.serializer = serializer; - } + return encodedValue; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java new file mode 100644 index 0000000..e4d5d44 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; + + +/** + * Basic functionality for our commands to handle cursor IO + * @param <T> the input type + * @param <R> The output Type + */ +public abstract class AbstractFilter<T, R> implements Filter<T, R> { + + + protected PipelineContext pipelineContext; + + + @Override + public void setContext( final PipelineContext pipelineContext ) { + this.pipelineContext = pipelineContext; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java new file mode 100644 index 0000000..c68dc4a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import java.io.Serializable; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; + +import com.google.common.base.Optional; + + +/** + * Abstract class for filters to extend that require a cursor + * @param <T> The input type + * @param <R> The response type + * @param <C> The cursor type + */ +public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> { + + + + //TODO not a big fan of this, but not sure how to build resume otherwise + private CursorSeek<C> cursorSeek; + + + /** + * Return the parsed value of the cursor from the last request, if it exists + */ + protected Optional<C> getSeekValue() { + + if(cursorSeek == null) { + final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() ); + cursorSeek = new CursorSeek<>( cursor ); + } + + return cursorSeek.getSeekValue(); + + } + + + /** + * Sets the cursor into our pipeline context + */ + protected FilterResult<R> createFilterResult( final R emit, final C cursorValue, final Optional<EdgePath> parent ){ + + + //create a current path, and append our parent path to it + final EdgePath<C> newEdgePath = + new EdgePath<>( pipelineContext.getId(), cursorValue, getCursorSerializer(), parent ); + + //emit our value with the parent path + return new FilterResult<>( emit, Optional.of( newEdgePath ) ); + + } + + + /** + * Return the class to be used when parsing the cursor + */ + protected abstract CursorSerializer<C> getCursorSerializer(); + + + /** + * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values. + * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation + */ + protected static class CursorSeek<C> { + + private Optional<C> seek; + + private CursorSeek(final Optional<C> cursorValue){ + seek = cursorValue; + } + + + /** + * Get the seek value to use when searching + * @return + */ + public Optional<C> getSeekValue(){ + final Optional<C> toReturn = seek; + + seek = Optional.absent(); + + return toReturn; + } + + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java deleted file mode 100644 index 8d7f106..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.PipelineContext; - - -/** - * Basic functionality for our commands to handle cursor IO - * @param <T> the input type - * @param <R> The output Type - */ -public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> { - - - protected PipelineContext pipelineContext; - - - @Override - public void setContext( final PipelineContext pipelineContext ) { - this.pipelineContext = pipelineContext; - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java deleted file mode 100644 index c23a1b7..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import java.io.Serializable; - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; - -import com.google.common.base.Optional; - - -/** - * Abstract class for filters to extend that require a cursor - * @param <T> The input type - * @param <R> The response type - * @param <C> The cursor type - */ -public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> { - - - - //TODO not a big fan of this, but not sure how to build resume otherwise - private CursorSeek<C> cursorSeek; - - - /** - * Return the parsed value of the cursor from the last request, if it exists - */ - protected Optional<C> getSeekValue() { - - if(cursorSeek == null) { - final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() ); - cursorSeek = new CursorSeek<>( cursor ); - } - - return cursorSeek.getSeekValue(); - - } - - - /** - * Sets the cursor into our pipeline context - * @param newValue - */ - protected void setCursor(final C newValue){ - pipelineContext.setCursorValue( newValue, getCursorSerializer() ); - } - - - /** - * Return the class to be used when parsing the cursor - */ - protected abstract CursorSerializer<C> getCursorSerializer(); - - - /** - * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values. - * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation - */ - protected static class CursorSeek<C> { - - private Optional<C> seek; - - private CursorSeek(final Optional<C> cursorValue){ - seek = cursorValue; - } - - - /** - * Get the seek value to use when searching - * @return - */ - public Optional<C> getSeekValue(){ - final Optional<C> toReturn = seek; - - seek = Optional.absent(); - - return toReturn; - } - - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java deleted file mode 100644 index 4e6d06e..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.model.entity.Id; - - -/** - * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits - * an observable of ids - */ -public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java index 69d929c..e28ce44 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java @@ -20,11 +20,18 @@ package org.apache.usergrid.corepersistence.pipeline.read; +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; + + /** - * A command that is used to reduce our stream of results into a final output - * @param <T> + * A command that is used to reduce our stream of results into a stream of final batch outputs. When used + * no further transformation or encoding should occur. Otherwise EdgePath data will be lost, and serialization cannot occur + * across requests + * + * @param <T> The input type + * @param <R> The output type */ -public interface Collector<T, R> extends PipelineOperation<T, R> { +public interface Collector<T, R> extends PipelineOperation<T,R> { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java index 6893b34..dd200b5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java @@ -20,8 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; /** @@ -29,16 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollec */ public interface CollectorFactory { - /** - * Generate a new instance of the command with the specified parameters - */ - EntityLoadCollector entityLoadCollector(); /** - * Get the collector for collection candidate results to entities + * Get the results page collector * @return */ - CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector(); - + ResultsPageCollector getResultsPageCollector(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java new file mode 100644 index 0000000..c560fad --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; + +import com.google.common.base.Optional; + + +/** + * A path from our input element to our emitted element. A list of EdgePaths comprise a path through the graph. The chains of edge paths will result + * in a cursor when aggregated. If a graph traversal is the following + * + * applicationId(1) - "users" -> userId(2) - "devices" -> deviceId(3). There would be 2 EdgePath + * + * EdgePath("users"->userId(2)) <- parent - EdgePath("devices" -> deviceId(3)) + */ +public class EdgePath<C> { + + + private final int filterId; + private final C cursorValue; + private final CursorSerializer<C> serializer; + private final Optional<EdgePath> previous; + + + /** + * + * @param filterId The id of the filter that generated this path + * @param cursorValue The value to resume seeking on the path + * @param serializer The serializer to serialize the value + * @param parent The parent graph path edge to reach this path + */ + public EdgePath( final int filterId, final C cursorValue, final CursorSerializer<C> serializer, + final Optional<EdgePath> parent ) { + this.filterId = filterId; + this.cursorValue = cursorValue; + this.serializer = serializer; + this.previous = parent; + } + + + public C getCursorValue() { + return cursorValue; + } + + + public int getFilterId() { + return filterId; + } + + + public Optional<EdgePath> getPrevious() { + return previous; + } + + + public CursorSerializer<C> getSerializer() { + return serializer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java index ace62db..054a85a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java @@ -20,11 +20,12 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; /** - * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits - * an observable of ids + * Traverses edges in the graph. Either by query or graph traversal. Take an observable of FilterResult, and emits + * an observable of FilterResults. Filters should never emit groups or objects that represent collections. Items should + * always be emitted 1 at a time. It is the responsibility of the collector to aggregate results. */ -public interface Filter<T, R> extends PipelineOperation<T, R> {} +public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index 078d981..c465516 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -20,10 +20,12 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter; import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; @@ -43,6 +45,7 @@ public interface FilterFactory { /** * Generate a new instance of the command with the specified parameters + * * @param collectionName The collection name to use when reading the graph */ ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName ); @@ -57,12 +60,14 @@ public interface FilterFactory { /** * Generate a new instance of the command with the specified parameters + * * @param connectionName The connection name to use when traversing the graph */ ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName ); /** * Generate a new instance of the command with the specified parameters + * * @param connectionName The connection name to use when traversing the graph * @param entityType The entity type to use when traversing the graph */ @@ -72,13 +77,15 @@ public interface FilterFactory { /** * Read a connection directly between two identifiers + * * @param connectionName The connection name to use when traversing the graph - * @param targetId The target Id to use when traversing the graph + * @param targetId The target Id to use when traversing the graph */ ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId ); /** * Generate a new instance of the command with the specified parameters + * * @param query The query to use when querying the entities in the collection * @param collectionName The collection name to use when querying */ @@ -90,6 +97,7 @@ public interface FilterFactory { /** * Generate a new instance of the command with the specified parameters + * * @param query The query to use when querying the entities in the connection * @param connectionName The type of connection to query * @param connectedEntityType The type of entity in the connection. Leave absent to query all entity types @@ -102,13 +110,24 @@ public interface FilterFactory { /** - * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is an intermediate step, - * not a final filter before collectors + * Generate a new instance of the command with the specified parameters + */ + EntityLoadFilter entityLoadFilter(); + + /** + * Get the collector for collection candidate results to entities */ - CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter(); + CandidateEntityFilter candidateEntityFilter(); + + /** + * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is + * an intermediate step, not a final filter before collectors + */ + CandidateIdFilter candidateResultsIdVerifyFilter(); /** * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals + * * @param entityId The entity id to emit */ EntityIdFilter getEntityIdFilter( final Id entityId ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java new file mode 100644 index 0000000..3c41a2b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import com.google.common.base.Optional; + + +/** + * A bean that is passed between filters with immutable cursor state + * @param <T> + */ +public class FilterResult<T> { + private final T value; + private final Optional<EdgePath> path; + + + /** + * Create a new immutable filtervalue + * @param value The value the filter emits + * @param path The path to this value, if created + */ + public FilterResult( final T value, final Optional<EdgePath> path ) { + this.value = value; + this.path = path; + } + + + public T getValue() { + return value; + } + + + public Optional<EdgePath> getPath() { + return path; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java deleted file mode 100644 index 28bba36..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.PipelineContext; - -import rx.Observable; - - -/** - * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses. - * This takes an input of Id, performs some operation, and emits values for further processing in the Observable - * pipeline - * @param <T> The input type - * @param <R> - */ -public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> { - - void setContext(final PipelineContext pipelineContext); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java index 25ab03e..d0e87b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java @@ -20,9 +20,6 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.PipelineResult; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; @@ -103,5 +100,5 @@ public interface ReadPipelineBuilder { * Load our entity results when our previous filter calls graph * @return */ - Observable<PipelineResult<ResultsPage>> execute(); + Observable<ResultsPage> execute(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java index 4ecfb47..ffb9f7d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java @@ -24,9 +24,9 @@ import java.util.ArrayList; import java.util.List; import org.apache.usergrid.corepersistence.pipeline.Pipeline; -import org.apache.usergrid.corepersistence.pipeline.PipelineResult; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter; +import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Id; @@ -52,6 +52,8 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { private final ApplicationScope applicationScope; + private final CollectorFactory collectorFactory; + /** * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are @@ -70,6 +72,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { this.filterFactory = filterFactory; this.applicationScope = applicationScope; + this.collectorFactory = collectorFactory; //init our cursor to empty this.cursor = Optional.absent(); @@ -78,7 +81,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { this.limit = DEFAULT_LIMIT; - this.collectorState = new CollectorState( collectorFactory ); + this.collectorState = new CollectorState( ); this.filters = new ArrayList<>(); } @@ -120,7 +123,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) ); - this.collectorState.setEntityLoaderCollector(); + this.collectorState.setIdEntityLoaderFilter(); return this; } @@ -132,7 +135,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { filters.add( filterFactory.readGraphCollectionFilter( collectionName ) ); - this.collectorState.setEntityLoaderCollector(); + this.collectorState.setIdEntityLoaderFilter(); return this; } @@ -147,7 +150,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) ); - this.collectorState.setCandidateResultsEntityResultsCollector(); + this.collectorState.setCandidateEntityFilter(); return this; } @@ -159,7 +162,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { ValidationUtils.verifyIdentity( entityId ); filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) ); - collectorState.setEntityLoaderCollector(); + collectorState.setIdEntityLoaderFilter(); return this; } @@ -169,7 +172,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { public ReadPipelineBuilder getConnection( final String connectionName ) { Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); filters.add( filterFactory.readGraphConnectionFilter( connectionName ) ); - collectorState.setEntityLoaderCollector(); + collectorState.setIdEntityLoaderFilter(); return this; } @@ -182,7 +185,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) ); - collectorState.setEntityLoaderCollector(); + collectorState.setIdEntityLoaderFilter(); return this; } @@ -196,17 +199,25 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { Preconditions.checkNotNull( query, "query must not be null" ); filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) ); - collectorState.setCandidateResultsEntityResultsCollector(); + collectorState.setCandidateEntityFilter(); return this; } @Override - public Observable<PipelineResult<ResultsPage>> execute() { + public Observable<ResultsPage> execute() { ValidationUtils.validateApplicationScope( applicationScope ); - final Collector<?, ResultsPage> collector = collectorState.getCollector(); + + //add our last filter that will generate entities + final Filter<?, Entity> finalFilter = collectorState.getFinalFilter(); + + filters.add( finalFilter ); + + + //execute our collector + final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector(); Preconditions.checkNotNull( collector, "You have not specified an operation that creates a collection filter. This is required for loading " @@ -229,46 +240,52 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { * A mutable state for our collectors. Rather than create a new instance each time, we create a singleton * collector */ - private static final class CollectorState { - private final CollectorFactory collectorFactory; + private final class CollectorState { + - private EntityLoadCollector entityLoadCollector; + private EntityLoadFilter entityLoadCollector; - private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector; + private CandidateEntityFilter candidateEntityFilter; + private Filter entityLoadFilter; - private Collector<?, ResultsPage> collector = null; - private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;} + private CollectorState( ){} - public void setEntityLoaderCollector() { + /** + * Set our final filter to be a load entity by Id filter + */ + public void setIdEntityLoaderFilter() { if ( entityLoadCollector == null ) { - entityLoadCollector = collectorFactory.entityLoadCollector(); + entityLoadCollector = filterFactory.entityLoadFilter(); } - collector = entityLoadCollector; + entityLoadFilter = entityLoadCollector; } - public void setCandidateResultsEntityResultsCollector() { - if ( candidateResultsEntityResultsCollector == null ) { - candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector(); + /** + * Set our final filter to be a load entity by candidate filter + */ + public void setCandidateEntityFilter() { + if ( candidateEntityFilter == null ) { + candidateEntityFilter = filterFactory.candidateEntityFilter(); } - collector = candidateResultsEntityResultsCollector; + entityLoadFilter = candidateEntityFilter; } public void clear() { - collector = null; + entityLoadFilter = null; } - public Collector<?, ResultsPage> getCollector() { - return collector; + public Filter<?, Entity> getFinalFilter() { + return entityLoadFilter; } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java index 198ac67..1810d65 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java @@ -22,18 +22,28 @@ package org.apache.usergrid.corepersistence.pipeline.read; import java.util.List; +import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; import org.apache.usergrid.persistence.model.entity.Entity; /** - * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be considered a "page" of results. + * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be + * considered a "page" of results. A hold over from 1.0. We shouldn't need this when we fully move away from the EM/RM */ public class ResultsPage { private final List<Entity> entityList; + private final int limit; - public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;} + private final ResponseCursor responseCursor; + + + public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) { + this.entityList = entityList; + this.responseCursor = responseCursor; + this.limit = limit; + } public List<Entity> getEntityList() { @@ -43,9 +53,15 @@ public class ResultsPage { /** * Return true if the results page is empty - * @return */ - public boolean isEmpty(){ - return entityList == null || entityList.isEmpty(); + public boolean hasMoreResults() { + return entityList != null && entityList.size() == limit; + } + + + + + public ResponseCursor getResponseCursor() { + return responseCursor; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java new file mode 100644 index 0000000..1c5175d --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; +import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; + + +/** + * Basic functionality for our commands to handle cursor IO + * @param <T> the input type + * @param <R> The output Type + */ +public abstract class AbstractCollector<T, R> implements Collector<T, R> { + + + protected PipelineContext pipelineContext; + + + @Override + public void setContext( final PipelineContext pipelineContext ) { + this.pipelineContext = pipelineContext; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java new file mode 100644 index 0000000..84654aa --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import java.util.ArrayList; +import java.util.List; + +import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; +import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.persistence.model.entity.Entity; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * Takes entities and collects them into results. This mostly exists for 1.0 compatibility. Eventually this will + * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST. + */ +public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage> + implements Collector<Entity, ResultsPage> { + + + @Override + public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) { + + final int limit = pipelineContext.getLimit(); + + return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect( + () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> { + collector.add( entity ); + } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results, + new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) ); + } + + + /** + * A collector that will aggregate our results together + */ + private static class ResultsPageWithCursorCollector { + + + private final List<Entity> results; + + private Optional<EdgePath> lastPath; + + + private ResultsPageWithCursorCollector( final int limit ) { + this.results = new ArrayList<>( limit ); + } + + + public void add( final FilterResult<Entity> result ) { + this.results.add( result.getValue() ); + this.lastPath = result.getPath(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java index eac8a65..004a696 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java @@ -24,11 +24,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter; -import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; import org.apache.usergrid.persistence.index.CandidateResults; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.SearchEdge; @@ -44,8 +46,8 @@ import rx.Observable; /** * Command for reading graph edges */ -public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer> - implements CandidateResultsFilter { +public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer> + implements Filter<Id, Candidate> { private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class ); @@ -66,7 +68,7 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter< @Override - public Observable<CandidateResults> call( final Observable<Id> observable ) { + public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) { //get the graph manager final ApplicationEntityIndex applicationEntityIndex = @@ -80,12 +82,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter< //return all ids that are emitted from this edge - return observable.flatMap( id -> { + return observable.flatMap( idFilterResult -> { - final SearchEdge searchEdge = getSearchEdge( id ); + final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() ); - final Observable<CandidateResults> candidates = Observable.create( subscriber -> { + final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> { //our offset to our start value. This will be set the first time we emit //after we receive new ids, we want to reset this to 0 @@ -98,19 +100,14 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter< subscriber.onStart(); - //emit while we have values from ES - while ( true ) { + //emit while we have values from ES and someone is subscribed + while ( !subscriber.isUnsubscribed() ) { try { final CandidateResults candidateResults = applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet ); - currentOffSet += candidateResults.size(); - - //set the cursor for the next value - setCursor( currentOffSet ); - /** * No candidates, we're done */ @@ -119,7 +116,25 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter< return; } - subscriber.onNext( candidateResults ); + + for( CandidateResult candidateResult: candidateResults){ + + //our subscriber unsubscribed, break out + if(subscriber.isUnsubscribed()){ + return; + } + + final Candidate candidate = new Candidate( candidateResult, searchEdge ); + + final FilterResult<Candidate> + result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() ); + + subscriber.onNext( result ); + + currentOffSet++; + } + + } catch ( Throwable t ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java new file mode 100644 index 0000000..ab9d5d9 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.SearchEdge; + + +/** + * Create a candidate. This holds the original candidate, as well as the search scope it was found it + */ +public class Candidate { + + private final CandidateResult candidateResult; + private final SearchEdge searchEdge; + + + /** + * Create a new Candidate for further processing + * @param candidateResult The candidate result + * @param searchEdge The search edge this was searched on + */ + public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) { + this.candidateResult = candidateResult; + this.searchEdge = searchEdge; + } + + + public CandidateResult getCandidateResult() { + return candidateResult; + } + + + public SearchEdge getSearchEdge() { + return searchEdge; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java new file mode 100644 index 0000000..d30917c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Optional; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from an incoming CandidateResult emissions into entities, then streams them on + * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the + * collector to determine when to stop streaming entities. + */ +public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity> + implements Filter<Candidate, Entity> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; + + + @Inject + public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; + } + + + @Override + public Observable<FilterResult<Entity>> call( + final Observable<FilterResult<Candidate>> candidateResultsObservable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + final ApplicationEntityIndex applicationIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + //buffer them to get a page size we can make 1 network hop + final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() ) + + //load them + .flatMap( candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = + Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList(); + + //load the ids + final Observable<EntitySet> entitySetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) ); + + //now we have a collection, validate our canidate set is correct. + + return entitySetObservable.map( + entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet, + candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ) + .flatMap( + entityCollector -> Observable.from( entityCollector.getResults() ) ); + } ); + + //if we filter all our results, we want to continue to try the next page + return searchIdSetObservable; + } + + + + + /** + * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally + */ + private static final class EntityVerifier { + + private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); + private List<FilterResult<Entity>> results = new ArrayList<>(); + + private final EntityIndexBatch batch; + private final List<FilterResult<Candidate>> candidateResults; + private final EntitySet entitySet; + + + public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet, + final List<FilterResult<Candidate>> candidateResults ) { + this.batch = batch; + this.entitySet = entitySet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( entitySet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final FilterResult<Candidate> candidateResult : candidateResults ) { + validate( candidateResult ); + } + + batch.execute(); + } + + + public List<FilterResult<Entity>> getResults() { + return results; + } + + + public EntityIndexBatch getBatch() { + return batch; + } + + + private void validate( final FilterResult<Candidate> filterResult ) { + + final Candidate candidate = filterResult.getValue(); + final CandidateResult candidateResult = candidate.getCandidateResult(); + final SearchEdge searchEdge = candidate.getSearchEdge(); + final Id candidateId = candidateResult.getId(); + final UUID candidateVersion = candidateResult.getVersion(); + + + final MvccEntity entity = entitySet.getEntity( candidateId ); + + + //doesn't exist warn and drop + if ( entity == null ) { + logger.warn( + "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra." + + " Ignoring since this could be a region sync issue", + candidateId, candidateVersion ); + + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + + } + + + final UUID entityVersion = entity.getVersion(); + final Id entityId = entity.getId(); + + + + + + //entity is newer than ES version, could be an update or the entity is marked as deleted + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) { + + logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + new Object[] { searchEdge, entityId, entityVersion } ); + batch.deindex( searchEdge, entityId, entityVersion ); + return; + } + + //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to + //remove the ES record, since the read in cass should cause a read repair, just ignore + if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { + + logger.warn( + "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " + + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + } + + //they're the same add it + + final Entity returnEntity = entity.getEntity().get(); + + final Optional<EdgePath> parent = filterResult.getPath(); + + final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent ); + + results.add( toReturn ); + } + } +}
