Refactor into commands WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d5978cca Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d5978cca Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d5978cca Branch: refs/heads/USERGRID-593 Commit: d5978cca07366c43314687081973f48a80b61d8c Parents: 60617d2 Author: Todd Nine <tn...@apigee.com> Authored: Fri Apr 24 14:45:50 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Fri Apr 24 14:45:50 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 6 +- .../corepersistence/command/CommandBuilder.java | 10 +- .../command/read/CollectCommand.java | 36 --- .../corepersistence/command/read/Collector.java | 36 +++ .../command/read/ReadCommandFactory.java | 81 +++++++ .../AbstractQueryElasticSearchCollector.java | 107 +++++++++ .../QueryCollectionElasticSearchCollector.java | 62 +++++ .../QueryConnectionElasticSearchCollector.java | 62 +++++ .../impl/CollectionRefsVerifier.java | 44 ++++ .../CollectionResultsLoaderFactoryImpl.java | 59 +++++ .../impl/ConnectionRefsVerifier.java | 59 +++++ .../ConnectionResultsLoaderFactoryImpl.java | 65 +++++ .../impl/ElasticSearchQueryExecutor.java | 238 ++++++++++++++++++ .../read/elasticsearch/impl/EntityVerifier.java | 127 ++++++++++ .../elasticsearch/impl/FilteringLoader.java | 226 +++++++++++++++++ .../read/elasticsearch/impl/IdsVerifier.java | 46 ++++ .../read/elasticsearch/impl/ResultsLoader.java | 43 ++++ .../impl/ResultsLoaderFactory.java | 41 ++++ .../elasticsearch/impl/ResultsVerifier.java | 52 ++++ .../elasticsearch/impl/VersionVerifier.java | 85 +++++++ .../read/entity/EntityLoadCollector.java | 138 +++++++++++ .../command/read/entity/EntityLoadCommand.java | 134 ----------- .../read/graph/ReadGraphCollectionCommand.java | 3 + .../read/graph/ReadGraphConnectionCommand.java | 3 + .../results/AbstractGraphQueryExecutor.java | 4 +- .../results/CollectionRefsVerifier.java | 44 ---- .../CollectionResultsLoaderFactoryImpl.java | 59 ----- .../results/ConnectionRefsVerifier.java | 61 ----- .../ConnectionResultsLoaderFactoryImpl.java | 65 ----- .../results/ElasticSearchQueryExecutor.java | 240 ------------------- .../corepersistence/results/EntityVerifier.java | 127 ---------- .../results/FilteringLoader.java | 226 ----------------- .../corepersistence/results/IdsVerifier.java | 46 ---- .../results/ObservableQueryExecutor.java | 76 ++++++ .../corepersistence/results/ResultsLoader.java | 43 ---- .../results/ResultsLoaderFactory.java | 41 ---- .../results/ResultsVerifier.java | 52 ---- .../results/VersionVerifier.java | 85 ------- 38 files changed, 1663 insertions(+), 1269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 86ba300..de7fef1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -32,10 +32,10 @@ import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor; -import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor; -import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; -import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor; import org.apache.usergrid.corepersistence.results.QueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java index de76f50..d9d6a12 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java @@ -22,7 +22,7 @@ package org.apache.usergrid.corepersistence.command; import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; -import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.command.read.Collector; import org.apache.usergrid.corepersistence.command.read.Command; import org.apache.usergrid.corepersistence.command.read.TraverseCommand; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -87,12 +87,12 @@ public class CommandBuilder { /** * Build the final collection step, and */ - public <T> Observable<T> build( final CollectCommand<T> collectCommand ) { - setState( collectCommand ); + public <T> Observable<T> build( final Collector<T> collector ) { + setState( collector ); - collectCommand.setLimit( limit ); + collector.setLimit( limit ); - return currentObservable.compose( collectCommand ); + return currentObservable.compose( collector ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java deleted file mode 100644 index c2ad931..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.command.read; - - -/** - * A command that is used to reduce our stream of results into a final output - * @param <T> - */ -public interface CollectCommand<T> extends Command<T>{ - - /** - * Set the prefered result size for the command - * @param resultSize - */ - void setLimit( final int resultSize ); - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java new file mode 100644 index 0000000..2804db4 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +/** + * A command that is used to reduce our stream of results into a final output + * @param <T> + */ +public interface Collector<T> extends Command<T>{ + + /** + * Set the prefered result size for the command + * @param limit + */ + void setLimit( final int limit ); + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java new file mode 100644 index 0000000..07f300a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +import org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryCollectionElasticSearchCollector; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryConnectionElasticSearchCollector; +import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector; +import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand; +import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * A factory for generating read commands + */ +public interface ReadCommandFactory { + + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param collectionName + * @return + */ + ReadGraphCollectionCommand readGraphCollectionCommand(final ApplicationScope applicationScope, final Id sourceId, final String collectionName); + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param connectionName + * @return + */ + ReadGraphConnectionCommand readGraphConnectionCommand(final ApplicationScope applicationScope, final Id sourceId, final String connectionName); + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @return + */ + EntityLoadCollector entityLoadCollector(final ApplicationScope applicationScope); + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param collectionName + * @return + */ + QueryCollectionElasticSearchCollector queryCollectionElasticSearchCollector(final ApplicationScope applicationScope, final Id sourceId, final String collectionName); + + + /** + * Generate a new instance of the command with the specified parameters + * @param applicationScope + * @param sourceId + * @param connectionName + * @return + */ + QueryConnectionElasticSearchCollector queryConnectionElasticSearchCollector(final ApplicationScope applicationScope, final Id sourceId, final String connectionName); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java new file mode 100644 index 0000000..18cdeb0 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch; + + +import java.util.Iterator; + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.Collector; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor; +import org.apache.usergrid.corepersistence.results.QueryExecutor; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * A command that will query and load elasticsearch + * + * On future iteration, this needs to be split into 2 commands 1 that loads the candidate results and validates the + * versions then another that will search and load + * + * TODO, split this into 3 seperate observables + * + * 1) An observable that emits candidate results + * 2) An observable that validates versions by uuid (for Traverse commands) + * 3) An observbale that emits Results as a collector (for final commands) + */ +public abstract class AbstractQueryElasticSearchCollector extends AbstractCommand<Results, Integer> implements + Collector<Results> { + + + private final ResultsLoaderFactory resultsLoaderFactory; + private final ApplicationEntityIndex entityIndex; + private final ApplicationScope applicationScope; + private final SearchEdge indexScope; + private final SearchTypes searchTypes; + private final Query query; + private int limit; + + + + protected AbstractQueryElasticSearchCollector( final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, final SearchEdge indexScope, + final SearchTypes searchTypes, final Query query ) { + this.entityIndex = entityIndex; + this.applicationScope = applicationScope; + this.indexScope = indexScope; + this.searchTypes = searchTypes; + this.query = query; + this.resultsLoaderFactory = getResultsLoaderFactory(); + } + + + @Override + public Observable<Results> call( final Observable<Id> idObservable ) { + final Iterable<Results> executor = + new ElasticSearchQueryExecutor( resultsLoaderFactory, entityIndex, applicationScope, indexScope, searchTypes, + query.withLimit( limit )); + + return Observable.from(executor); + } + + + /** + * Get the results loader factor + * @return + */ + protected abstract ResultsLoaderFactory getResultsLoaderFactory(); + + @Override + protected Class<Integer> getCursorClass() { + return Integer.class; + } + + + @Override + public void setLimit( final int limit ) { + this.limit = limit; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java new file mode 100644 index 0000000..2bbcc48 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch; + + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; + +import com.google.inject.Inject; + + +/** + * Command for querying connections + */ +public class QueryCollectionElasticSearchCollector extends AbstractQueryElasticSearchCollector { + + private final ManagerCache managerCache; + private final EntityRef headEntity; + private final String connectionName; + + @Inject + protected QueryCollectionElasticSearchCollector( final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, + final SearchEdge indexScope, final SearchTypes searchTypes, + final Query query, final ManagerCache managerCache, + final EntityRef headEntity, final String connectionName ) { + super( entityIndex, applicationScope, indexScope, searchTypes, query ); + this.managerCache = managerCache; + this.headEntity = headEntity; + this.connectionName = connectionName; + } + + + @Override + protected ResultsLoaderFactory getResultsLoaderFactory() { + return new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java new file mode 100644 index 0000000..0e2f221 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch; + + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; +import org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; + +import com.google.inject.Inject; + + +/** + * Command for querying connections + */ +public class QueryConnectionElasticSearchCollector extends AbstractQueryElasticSearchCollector { + + private final ManagerCache managerCache; + private final EntityRef headEntity; + private final String connectionName; + + @Inject + protected QueryConnectionElasticSearchCollector( final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, + final SearchEdge indexScope, final SearchTypes searchTypes, + final Query query, final ManagerCache managerCache, + final EntityRef headEntity, final String connectionName ) { + super( entityIndex, applicationScope, indexScope, searchTypes, query ); + this.managerCache = managerCache; + this.headEntity = headEntity; + this.connectionName = connectionName; + } + + + @Override + protected ResultsLoaderFactory getResultsLoaderFactory() { + return new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java new file mode 100644 index 0000000..4b67ae8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.SimpleEntityRef; +import org.apache.usergrid.persistence.model.entity.Id; + + +public class CollectionRefsVerifier extends VersionVerifier { + + + + @Override + public Results getResults( final Collection<Id> ids ) { + List<EntityRef> refs = new ArrayList<EntityRef>(ids.size()); + for ( Id id : ids ) { + refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) ); + } + return Results.fromRefList( refs ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java new file mode 100644 index 0000000..f9dd4e1 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.SearchEdge; + + +/** + * Factory for creating results + */ +public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory { + + private final ManagerCache managerCache; + + + public CollectionResultsLoaderFactoryImpl( final ManagerCache managerCache ) { + this.managerCache = managerCache; + } + + + @Override + public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope, final Query.Level resultsLevel ) { + + ResultsVerifier verifier; + + if ( resultsLevel == Query.Level.REFS ) { + verifier = new CollectionRefsVerifier(); + } + else if ( resultsLevel == Query.Level.IDS ) { + verifier = new IdsVerifier(); + } + else { + verifier = new EntityVerifier(Query.MAX_LIMIT); + } + + return new FilteringLoader( managerCache, verifier, applicationScope, scope ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java new file mode 100644 index 0000000..bffb53a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.usergrid.persistence.ConnectionRef; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; +import org.apache.usergrid.persistence.model.entity.Id; + +import static org.apache.usergrid.persistence.SimpleEntityRef.ref; + + +/** + * Verifier for creating connections + */ +public class ConnectionRefsVerifier extends VersionVerifier { + + + private final EntityRef ownerId; + private final String connectionType; + + + public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) { + this.ownerId = ownerId; + this.connectionType = connectionType; + } + + @Override + public Results getResults( final Collection<Id> ids ) { + List<ConnectionRef> refs = new ArrayList<>(); + for ( Id id : ids ) { + refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid()) )); + } + + return Results.fromConnections( refs ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java new file mode 100644 index 0000000..707e933 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.SearchEdge; + + +/** + * Factory for creating results + */ +public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory { + + private final ManagerCache managerCache; + private final EntityRef ownerId; + private final String connectionType; + + + public ConnectionResultsLoaderFactoryImpl( final ManagerCache managerCache, final EntityRef ownerId, + final String connectionType ) { + this.managerCache = managerCache; + this.ownerId = ownerId; + this.connectionType = connectionType; + } + + + @Override + public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope, final Query.Level resultsLevel ) { + + ResultsVerifier verifier; + + if ( resultsLevel == Query.Level.REFS ) { + verifier = new ConnectionRefsVerifier( ownerId, connectionType ); + } + else if ( resultsLevel == Query.Level.IDS ) { + verifier = new ConnectionRefsVerifier( ownerId, connectionType );; + } + else { + verifier = new EntityVerifier(Query.MAX_LIMIT); + } + + return new FilteringLoader( managerCache, verifier, applicationScope, scope ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java new file mode 100644 index 0000000..c7e8f56 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.usergrid.corepersistence.results.QueryExecutor; +import org.apache.usergrid.persistence.index.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> { + + private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class ); + + private final ResultsLoaderFactory resultsLoaderFactory; + + private final ApplicationScope applicationScope; + + private final ApplicationEntityIndex entityIndex; + + private final SearchEdge indexScope; + + private final SearchTypes types; + + private final Query query; + + + private Results currentResults; + + private boolean moreToLoad = true; + + + public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex, + final ApplicationScope applicationScope, final SearchEdge indexScope, + final SearchTypes types, final Query query ) { + this.resultsLoaderFactory = resultsLoaderFactory; + this.applicationScope = applicationScope; + this.entityIndex = entityIndex; + this.indexScope = indexScope; + this.types = types; + + //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant + //once we start subscribing to streams. + this.query = new Query(query); + } + + + @Override + public Iterator<Results> iterator() { + return this; + } + + + private void loadNextPage() { + // Because of possible stale entities, which are filtered out by buildResults(), + // we loop until the we've got enough results to satisfy the query limit. + + final int maxQueries = 10; // max re-queries to satisfy query limit + + final int originalLimit = query.getLimit(); + + Results results = null; + int queryCount = 0; + + + CandidateResults crs = null; + + while ( queryCount++ < maxQueries ) { + + crs = getCandidateResults( query ); + + + logger.debug( "Calling build results with crs {}", crs ); + results = buildResults( indexScope, query, crs ); + + /** + * In an edge case where we delete stale entities, we could potentially get less results than expected. + * This will only occur once during the repair phase. + * We need to ensure that we short circuit before we overflow the requested limit during a repair. + */ + if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more + break; + } + + + //we didn't load anything, but there was a cursor, this means a read repair occured. We have to short + //circuit to avoid over returning the result set + + + // need to query for more + // ask for just what we need to satisfy, don't want to exceed limit + query.setOffsetFromCursor(results.getCursor()); + query.setLimit( originalLimit - results.size() ); + + logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] { + originalLimit, query.getLimit(), queryCount + } ); + } + + //now set our cursor if we have one for the next iteration + if ( results.hasCursor() ) { + query.setOffsetFromCursor(results.getCursor()); + moreToLoad = true; + } + + else { + moreToLoad = false; + } + + + //set our select subjects into our query if provided + if(crs != null){ + query.setSelectSubjects( crs.getGetFieldMappings() ); + } + + + //set our current results and the flag + this.currentResults = results; + } + + + /** + * Get the candidates or load the cursor, whichever we require + * @param query + * @return + */ + private CandidateResults getCandidateResults(final Query query){ + final Optional<Integer> cursor = query.getOffset(); + final String queryToExecute = query.getQl().or("select *"); + + CandidateResults results = cursor.isPresent() + ? entityIndex.search( indexScope, types, queryToExecute, query.getLimit() , cursor.get()) + : entityIndex.search( indexScope, types, queryToExecute, query.getLimit()); + + return results; + } + + + /** + * Build results from a set of candidates, and discard those that represent stale indexes. + * + * @param query Query that was executed + * @param crs Candidates to be considered for results + */ + private Results buildResults( final SearchEdge indexScope, final Query query, final CandidateResults crs ) { + + logger.debug( "buildResults() from {} candidates", crs.size() ); + + //get an instance of our results loader + final ResultsLoader resultsLoader = + this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() ); + + //load the results + final Results results = resultsLoader.loadResults(crs); + + //signal for post processing + resultsLoader.postProcess(); + + //set offset into query + if(crs.getOffset().isPresent()) { + query.setOffset(crs.getOffset().get()); + }else{ + query.clearOffset(); + } + results.setCursorFromOffset( query.getOffset() ); + + logger.debug( "Returning results size {}", results.size() ); + + return results; + } + + + @Override + public boolean hasNext() { + + //we've tried to load and it's empty and we have more to load, load the next page + if ( currentResults == null ) { + //there's nothing left to load, nothing to do + if ( !moreToLoad ) { + return false; + } + + //load the page + + loadNextPage(); + } + + + //see if our current results are not null + return currentResults != null; + } + + + @Override + public Results next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more results present" ); + } + + final Results toReturn = currentResults; + + currentResults = null; + + return toReturn; + } + + @Override + public void remove() { + throw new RuntimeException("Remove not implemented!!"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java new file mode 100644 index 0000000..7be8aa4 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java @@ -0,0 +1,127 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.Entity; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Optional; + + +/** + * A loader that verifies versions are correct in cassandra and match elasticsearch + */ +public class EntityVerifier implements ResultsVerifier { + + private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class ); + + private EntitySet ids; + + private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping; + + + public EntityVerifier( final int maxSize ) { + this.entityMapping = new HashMap<>( maxSize ); + } + + + @Override + public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) { + ids = ecm.load( idsToLoad ).toBlocking().last(); + logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size()); + } + + + @Override + public boolean isValid( final CandidateResult candidateResult ) { + final Id entityId = candidateResult.getId(); + + final MvccEntity savedEntity = ids.getEntity( entityId ); + + //version wasn't found deindex + if ( savedEntity == null ) { + logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() ); + return false; + } + + final UUID candidateVersion = candidateResult.getVersion(); + final UUID savedVersion = savedEntity.getVersion(); + + if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) { + logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { + entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity + } ); + + return false; + } + + + final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity(); + + if ( !entity.isPresent() ) { + logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ", + entityId.getUuid(), savedEntity.getEntity() ); + return false; + } + + entityMapping.put( entityId, entity.get() ); + + return true; + } + + + @Override + public Results getResults( final Collection<Id> ids ) { + + final List<Entity> ugEntities = new ArrayList<>( ids.size() ); + + for ( final Id id : ids ) { + final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id ); + + Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + ugEntities.add( entity ); + } + + return Results.fromEntities( ugEntities ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java new file mode 100644 index 0000000..c0afe92 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; + + +public class FilteringLoader implements ResultsLoader { + + private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class ); + + private final ManagerCache managerCache; + private final ResultsVerifier resultsVerifier; + private final ApplicationScope applicationScope; + private final SearchEdge indexScope; + private final EntityIndexBatch indexBatch; + + + /** + * Create an instance of a filter loader + * + * @param managerCache The manager cache to load + * @param resultsVerifier The verifier to verify the candidate results + * @param applicationScope The application scope to perform the load + * @param indexScope The index scope used in the search + */ + protected FilteringLoader( final ManagerCache managerCache, final ResultsVerifier resultsVerifier, + final ApplicationScope applicationScope, final SearchEdge indexScope ) { + + this.managerCache = managerCache; + this.resultsVerifier = resultsVerifier; + this.applicationScope = applicationScope; + this.indexScope = indexScope; + + final ApplicationEntityIndex index = managerCache.getEntityIndex( applicationScope ); + + indexBatch = index.createBatch(); + } + + + @Override + public Results loadResults( final CandidateResults crs ) { + + + if ( crs.size() == 0 ) { + return new Results(); + } + + + // For each entity, holds the index it appears in our candidates for keeping ordering correct + final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() ); + + // Maps the entity ids to our candidates + final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() ); + + + final Iterator<CandidateResult> iter = crs.iterator(); + + + // TODO, in this case we're "optimizing" due to the limitations of collection scope. + // Perhaps we should change the API to just be an application, then an "owner" scope? + + // Go through the candidates and group them by scope for more efficient retrieval. + // Also remove duplicates before we even make a network call + for ( int i = 0; iter.hasNext(); i++ ) { + + final CandidateResult currentCandidate = iter.next(); + + final Id entityId = currentCandidate.getId(); + + //check if we've seen this candidate by id + final CandidateResult previousMax = maxCandidateMapping.get( entityId ); + + //its not been seen, save it + if ( previousMax == null ) { + maxCandidateMapping.put( entityId, currentCandidate ); + orderIndex.put( entityId, i ); + continue; + } + + //we have seen it, compare them + + final UUID previousMaxVersion = previousMax.getVersion(); + + final UUID currentVersion = currentCandidate.getVersion(); + + + final CandidateResult toRemove; + final CandidateResult toKeep; + + //current is newer than previous. Remove previous and keep current + if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) { + toRemove = previousMax; + toKeep = currentCandidate; + } + //previously seen value is newer than current. Remove the current and keep the previously seen value + else { + toRemove = currentCandidate; + toKeep = previousMax; + } + + //this is a newer version, we know we already have a stale entity, add it to be cleaned up + + + //de-index it + logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { + entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion() + } ); + + //deindex this document, and remove the previous maxVersion + //we have to deindex this from our ownerId, since this is what gave us the reference + indexBatch.deindex( indexScope, toRemove ); + + + //TODO, fire the entity repair cleanup task here instead of de-indexing + + //replace the value with a more current version + maxCandidateMapping.put( entityId, toKeep ); + orderIndex.put( entityId, i ); + } + + + //now everything is ordered, and older versions are removed. Batch fetch versions to verify + // existence and correct versions + + final TreeMap<Integer, Id> sortedResults = new TreeMap<>(); + + + final Collection<Id> idsToLoad = + Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() { + @Nullable + @Override + public Id apply( @Nullable final CandidateResult input ) { + //NOTE this is never null, we won't need to check + return input.getId(); + } + } ); + + + //now using the scope, load the collection + + // Get the collection scope and batch load all the versions. We put all entities in + // app/app for easy retrieval/ unless persistence changes, we never want to read from + // any scope other than the app, app, scope name scope + // final CollectionScope collScope = new CollectionScopeImpl( + // applicationScope.getApplication(), applicationScope.getApplication(), scopeName); + + final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope ); + + + //load the results into the loader for this scope for validation + resultsVerifier.loadResults( idsToLoad, ecm ); + + //now let the loader validate each candidate. For instance, the "max" in this candidate + //could still be a stale result, so it needs validated + for ( final Id requestedId : idsToLoad ) { + + final CandidateResult cr = maxCandidateMapping.get( requestedId ); + + //ask the loader if this is valid, if not discard it and de-index it + if ( !resultsVerifier.isValid( cr ) ) { + indexBatch.deindex( indexScope, cr ); + continue; + } + + //if we get here we're good, we need to add this to our results + final int candidateIndex = orderIndex.get( requestedId ); + + sortedResults.put( candidateIndex, requestedId ); + } + + + // NOTE DO NOT execute the batch here. + // It changes the results and we need consistent paging until we aggregate all results + return resultsVerifier.getResults( sortedResults.values() ); + } + + + @Override + public void postProcess() { + this.indexBatch.execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java new file mode 100644 index 0000000..4d2bd55 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.model.entity.Id; + + +public class IdsVerifier extends VersionVerifier { + + @Override + public Results getResults( final Collection<Id> ids ) { + + final List<UUID> returnIds = new ArrayList<>( ids.size() ); + + for ( final Id id : ids ) { + returnIds.add( id.getUuid() ); + } + + + return Results.fromIdList( returnIds ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java new file mode 100644 index 0000000..fa0e71f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.index.CandidateResults; + + +/** + * Interface for loading results + */ +public interface ResultsLoader { + + /** + * Using the candidate results, load our results. Should filter stale results + * @param crs The candidate result set + * @return Results. Null safe, but may be empty + */ + public Results loadResults( final CandidateResults crs); + + /** + * Post process the load operation + */ + public void postProcess(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java new file mode 100644 index 0000000..14db80e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.SearchEdge; + + +/** + * Factory for creating results + */ +public interface ResultsLoaderFactory { + + /** + * Get the loader for results + * @param applicationScope The application scope used to load results + * @param indexScope The index scope used in the search + * @param + */ + ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope, + final Query.Level resultsLevel ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java new file mode 100644 index 0000000..68515fa --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.Collection; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.model.entity.Id; + + +public interface ResultsVerifier { + + /** + * Load all the candidate ides for verification + * @param ids The Id's to load + * @param ecm The entity collection manager + */ + public void loadResults(Collection<Id> ids, EntityCollectionManager ecm); + + /** + * Return true if the candidate result is a valid result that should be retained. If it should + * not it should also be removed from the list of possible return values in this loader + * @param candidateResult + */ + public boolean isValid(CandidateResult candidateResult); + + + /** + * Load the result set with the given ids + * @return + */ + public Results getResults(Collection<Id> ids); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java new file mode 100644 index 0000000..58e4296 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl; + + +import java.util.Collection; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.VersionSet; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; + + +/** + * A loader that verifies versions are correct in Cassandra and match ElasticSearch + */ +public abstract class VersionVerifier implements ResultsVerifier { + + private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class ); + + private VersionSet ids; + + + @Override + public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) { + ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last(); + } + + + @Override + public boolean isValid( final CandidateResult candidateResult ) { + final Id entityId = candidateResult.getId(); + + final MvccLogEntry version = ids.getMaxVersion( entityId ); + + //version wasn't found ,deindex + if ( version == null ) { + logger.warn( "Version for Entity {}:{} not found", + entityId.getUuid(), entityId.getUuid() ); + + return false; + } + + final UUID savedVersion = version.getVersion(); + + if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) { + logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", + new Object[] { + entityId.getUuid(), + entityId.getType(), + candidateResult.getVersion(), + savedVersion + } ); + + return false; + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java new file mode 100644 index 0000000..1504f26 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.entity; + + +import java.io.Serializable; +import java.util.Map; + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.Collector; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Loads entities from a set of Ids. + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadCollector extends AbstractCommand<Results, Serializable> implements Collector<Results> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + //TODO get rid of this when merged into 2.0 dev + private final ApplicationScope applicationScope; + private int resultSize; + + + public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final ApplicationScope applicationScope ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.applicationScope = applicationScope; + } + + + @Override + protected Class<Serializable> getCursorClass() { + return null; + } + + + @Override + public Observable<Results> call( final Observable<Id> observable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + final Observable<EntitySet> entitySetObservable = observable.buffer( resultSize ).flatMap( + bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); + + + final Observable<Results> resultsObservable = entitySetObservable + + .flatMap( entitySet -> { + + //get our entites and filter missing ones, then collect them into a results object + final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); + + //convert them to our old entity model, then filter nulls, meaning they weren't found + return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( entity -> entity == null ) + + //convert them to a list, then map them into results + .toList().map( entities -> { + final Results results = Results.fromEntities( entities ); + results.setCursor( generateCursor() ); + + return results; + } ) + //if no results are present, return an empty results + .singleOrDefault( new Results( ) ); + } ); + + + return resultsObservable; + } + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + + + private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { + if ( !mvccEntity.getEntity().isPresent() ) { + return null; + } + + + final Entity cpEntity = mvccEntity.getEntity().get(); + final Id entityId = cpEntity.getId(); + + org.apache.usergrid.persistence.Entity entity = + EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + + return entity; + } + + + @Override + public void setLimit( final int limit ) { + this.resultSize = limit; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java deleted file mode 100644 index 08193ef..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.command.read.entity; - - -import java.io.Serializable; -import java.util.Map; - -import org.apache.usergrid.corepersistence.command.read.AbstractCommand; -import org.apache.usergrid.corepersistence.command.read.CollectCommand; -import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; -import org.apache.usergrid.persistence.EntityFactory; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; - - -/** - * Loads entities from a set of Ids. - * - * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification - */ -public class EntityLoadCommand extends AbstractCommand<Results, Serializable> implements CollectCommand<Results> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - - //TODO get rid of this when merged into 2.0 dev - private final ApplicationScope applicationScope; - private int resultSize; - - - public EntityLoadCommand( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final ApplicationScope applicationScope ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.applicationScope = applicationScope; - } - - - @Override - protected Class<Serializable> getCursorClass() { - return null; - } - - - @Override - public Observable<Results> call( final Observable<Id> observable ) { - - - /** - * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results - * objects - */ - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - final Observable<EntitySet> entitySetObservable = observable.buffer( resultSize ).flatMap( - bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); - - - return entitySetObservable - - .flatMap( entitySet -> { - - //get our entites and filter missing ones, then collect them into a results object - final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); - - //convert them to our old entity model, then filter nulls, meaning they weren't found - return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( - entity -> entity == null ) - - //convert them to a list, then map them into results - .toList().map( entities -> { - final Results results = Results.fromEntities( entities ); - results.setCursor( generateCursor() ); - - return results; - } ); - } ); - } - - /** - * Map a new cp entity to an old entity. May be null if not present - */ - - - private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { - if ( !mvccEntity.getEntity().isPresent() ) { - return null; - } - - - final Entity cpEntity = mvccEntity.getEntity().get(); - final Id entityId = cpEntity.getId(); - - org.apache.usergrid.persistence.Entity entity = - EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); - - Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); - entity.addProperties( entityMap ); - - return entity; - } - - - @Override - public void setLimit( final int resultSize ) { - this.resultSize = resultSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java index 9ccb969..3b6633b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java @@ -22,6 +22,8 @@ package org.apache.usergrid.corepersistence.command.read.graph; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import com.google.inject.Inject; + import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; @@ -36,6 +38,7 @@ public class ReadGraphCollectionCommand extends AbstractReadGraphCommand { /** * Create a new instance of our command */ + @Inject public ReadGraphCollectionCommand( final GraphManagerFactory graphManagerFactory, final String collectionName ) { super( graphManagerFactory ); this.collectionName = collectionName; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java index adebd45..26b30e7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java @@ -22,6 +22,8 @@ package org.apache.usergrid.corepersistence.command.read.graph; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import com.google.inject.Inject; + import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; @@ -36,6 +38,7 @@ public class ReadGraphConnectionCommand extends AbstractReadGraphCommand { /** * Create a new instance of our command */ + @Inject public ReadGraphConnectionCommand( final GraphManagerFactory graphManagerFactory, final String connectionName ) { super( graphManagerFactory ); this.connectionName = connectionName; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java index cf0764e..ec4271a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import org.apache.usergrid.corepersistence.command.CommandBuilder; -import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCommand; +import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; @@ -89,7 +89,7 @@ public abstract class AbstractGraphQueryExecutor implements QueryExecutor { //construct our results to be observed later. This is a cold observable final Observable<Results> resultsObservable = - commandBuilder.build( new EntityLoadCommand( entityCollectionManagerFactory, applicationScope ) ); + commandBuilder.build( new EntityLoadCollector( entityCollectionManagerFactory, applicationScope ) ); this.observableIterator = resultsObservable.toBlocking().getIterator();