Fixes resume logic by loading then filtering first id
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/413f023e Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/413f023e Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/413f023e Branch: refs/heads/two-dot-o-dev Commit: 413f023e243d8e398db6295b05a9c1f6c8c8feed Parents: 294a7d9 Author: Todd Nine <[email protected]> Authored: Mon May 4 09:11:01 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon May 4 09:11:01 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/pipeline/Pipeline.java | 3 +- .../pipeline/read/FilterFactory.java | 9 +++ .../pipeline/read/ReadPipelineBuilderImpl.java | 11 +++- .../pipeline/read/collect/EntityFilter.java | 68 ++++++++++++++++++++ .../read/collect/IdCursorSerializer.java | 41 ++++++++++++ .../persistence/index/usergrid-mappings.json | 1 - 6 files changed, 128 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java index df6a218..26cf346 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import com.google.common.base.Optional; @@ -77,7 +78,7 @@ public class Pipeline<R> { public Observable<R> execute(){ - Observable traverseObservable = Observable.just( applicationScope.getApplication() ); + Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() )); //build our traversal commands for ( PipelineOperation pipelineOperation : idPipelineOperationList ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index c465516..a2f1605 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.read; +import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter; @@ -131,4 +133,11 @@ public interface FilterFactory { * @param entityId The entity id to emit */ EntityIdFilter getEntityIdFilter( final Id entityId ); + + + /** + * Create a new instance of our entity filter + * @return + */ + EntityFilter entityFilter(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java index ffb9f7d..28446ad 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java @@ -26,9 +26,9 @@ import java.util.List; import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter; -import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; +import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; @@ -211,9 +211,14 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { //add our last filter that will generate entities - final Filter<?, Entity> finalFilter = collectorState.getFinalFilter(); + final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter(); - filters.add( finalFilter ); + filters.add( entityLoadFilter ); + + //add the filter that skips the first result on resume + final Filter<Entity, Entity> cursorEntityFilter = filterFactory.entityFilter(); + + filters.add( cursorEntityFilter ); //execute our collector http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java new file mode 100644 index 0000000..daf2e7f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if + * it matches the Id that was set + */ +public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> { + + + @Override + public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) { + + //filter only the first id, then map into our path for our next pass + + + return filterResultObservable.skipWhile( filterResult -> { + + final Optional<Id> startFromCursor = getSeekValue(); + + return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() ); + } ).map( filterResult -> { + + + final Entity entity = filterResult.getValue(); + final Id entityId = entity.getId(); + + return createFilterResult( entity, entityId, filterResult.getPath() ); + } ); + } + + + @Override + protected CursorSerializer<Id> getCursorSerializer() { + return IdCursorSerializer.INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java new file mode 100644 index 0000000..d96b9f2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * cursor serializer for Ids + */ +public class IdCursorSerializer extends AbstractCursorSerializer<Id> { + + + public static final IdCursorSerializer INSTANCE = new IdCursorSerializer(); + + @Override + protected Class<Id> getType() { + return Id.class; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json index bee84a2..c22a4ec 100644 --- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json +++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json @@ -73,7 +73,6 @@ }, "string": { "type": "string", - "doc_values": true, "norms": { "enabled": false },
