WIP and a mess, squash later.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c4f65484 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c4f65484 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c4f65484 Branch: refs/heads/USERGRID-528 Commit: c4f654847b8041a94176996feffa4060dd638819 Parents: 4c59f09 Author: Todd Nine <[email protected]> Authored: Fri Mar 20 19:10:58 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Mar 20 19:10:58 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/io/read/CommandBuilder.java | 36 +++--- .../io/read/EntityIndexCommands.java | 119 +++++++++++++++++++ .../corepersistence/io/state/CursorCache.java | 34 ++++++ .../corepersistence/rx/impl/CollectUntil.java | 60 ++++++++++ .../java/org/apache/usergrid/TempExample.java | 74 ++++++++++++ .../rx/impl/CollectUntilTest.java | 62 ++++++++++ 6 files changed, 368 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java index 119fc0e..828cbc9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.usergrid.corepersistence.io.reduce.StreamReducer; +import org.apache.usergrid.corepersistence.io.state.CursorCache; import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; @@ -35,31 +36,32 @@ import rx.functions.Func1; */ public class CommandBuilder { - private final Id root; - private final List<Command<Id>> commandList; + private CursorCache cache; + private final Observable<Id> pathObservable; - public CommandBuilder( final Id root ) {this.root = root; - commandList = new ArrayList<>( ); - } - public void addIntermediateCommand(final Command<Id> command){ - commandList.add( command ); + public CommandBuilder( final Id root ) { + pathObservable = Observable.just( root ); } - public <T> void addFinalCommand( final Command<T> command, final StreamReducer<T> reducer ) { - - Observable.just("foo").flatMap( new Func1<String, Observable<?>>() { - @Override - public Observable<?> call( final String s ) { - return null; - } - }; + /** + * Set our cache + * @param cache + */ + public void setCache(final CursorCache cache){ + this.cache = cache; } - public List<Command<Id>> getCommands(){ - return commandList; + /** + * Returns the observable that contains the current traversal operations + * @return + */ + public Observable<Id> getPathObservable(){ + return pathObservable; } + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java new file mode 100644 index 0000000..d6f4e93 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.io.read; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; +import org.apache.usergrid.persistence.index.query.CandidateResult; +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.functions.Action2; +import rx.functions.Func1; + + +public class EntityIndexCommands { + + +// /** +// * Perform a search of all the entities, and then return the observable of search results +// * @param index +// * @param edgeType +// * @param types +// * @param query +// * @return +// */ +// public static Func1<Id, SearchResults> searchEntities(final ApplicationEntityIndex index,final String edgeType, final SearchTypes types, final String query ){ +// +// return nodeId -> { +// +// } +// } + /** + * Construct an indexScope from the input id type + * @param type + * @return + */ + public static Func1<Id, IndexScope> createSearchScope(final String type){ + return id -> new IndexScopeImpl( id, type ); + } + /** + * Get our candidate results + * @param index + * @param types The types to return + * @param query + * @return + */ + public static Func1<IndexScope, CandidateResults> getCandidates(final ApplicationEntityIndex index, final SearchTypes types, final String query){ + return indexScope -> index.search( indexScope, types, Query.fromQLNullSafe( query ) ); + } + + + /** + * Flattens candidate results into a single stream of a result + * @return + */ + public static Func1<CandidateResults, Observable<CandidateResult>> flattenCandidates(){ + return (CandidateResults candidateResults) -> Observable.from( candidateResults ); + } + + + public static Action2<SearchResults, EntitySet> collectSet(){ + return (searchResults, entitySet) -> { + searchResults.addEntities( entitySet.entities ); + }; + } + + + + public static class SearchResults{ + private final List<Entity> entities; + private String cursor; + + + public SearchResults(final int maxSize) {entities = new ArrayList<>(maxSize);} + + public void addEntities(final Collection<Entity> entities){ + this.entities.addAll( entities ); + + } + public void setCursor(final String cursor){ + this.cursor = cursor; + } + } + + public static class EntitySet{ + private final List<Entity> entities; + + + public EntitySet( final List<Entity> entities ) {this.entities = entities;} + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java new file mode 100644 index 0000000..8a5ca49 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.io.state; + + +/** + * A class that represents a cursor cache + */ +public class CursorCache { + + public CursorCache(){ + + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java new file mode 100644 index 0000000..a2cb754 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx.impl; + + +import rx.Observable; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.internal.operators.OperatorScan; + + +/** + * An operation for performing a collect until the predicate returns true + */ +public class CollectUntil<T, R> implements Observable.Transformer<T, R> { + + final Func0<R> stateFactory; + final Action2<R, ? super T> collector; + final Func1<R, Boolean> predicate; + + + public CollectUntil( final Func0<R> stateFactory, final Action2<R, ? super T> collector, + final Func1<R, Boolean> predicate ) { + this.stateFactory = stateFactory; + this.collector = collector; + this.predicate = predicate; + } + + + @Override + public Observable<R> call( final Observable<T> tObservable ) { + Func2<R, T, R> accumulator = ( state, value ) -> { + collector.call( state, value ); + return state; + }; + + + return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( predicate ); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/test/java/org/apache/usergrid/TempExample.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/TempExample.java b/stack/core/src/test/java/org/apache/usergrid/TempExample.java new file mode 100644 index 0000000..1db450d --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/TempExample.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid; + + +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + +import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.createSearchScope; +import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.getCandidates; +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; + + +public class TempExample { + + //set our root observable + + + public static void main(String[] args) { + + final Id rootId = createId( "thing" ); + + final ApplicationEntityIndex index = null; + + + final SearchTypes searchType = SearchTypes.fromTypes( "test" ); + + final String query = "select * "; + + final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) ).map(getCandidates(index, searchType, query)); + + + observable.doOnNext( a -> System.out.println( a) ).toBlocking().last(); + } + + + private static final class ResultsCollector{ + + /** + * Add the candidates to our collection + * @param results + */ + public void addCandidates(final CandidateResults results ){ + + //TODO, collect the results, removing groups + Observable.from( results ).groupBy( candidate -> candidate.getId() ).collect( ) + } + + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java new file mode 100644 index 0000000..ce12429 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.rx.impl; + + +import org.junit.Test; + +import rx.Observable; + +import static org.junit.Assert.assertEquals; + + +public class CollectUntilTest { + + @Test + public void testCollectUntil() { + + final CollectUntil<Integer, CountCollector> collectUntil = + new CollectUntil<>( + () -> new CountCollector(), + ( collector, value ) -> collector.mark(), + collector -> collector.isFull() ); + + + final CountCollector collector = Observable.range( 0, 200 ).compose( collectUntil ).toBlocking().last(); + + assertEquals( 100, collector.count ); + } + + + private static final class CountCollector { + + private int count; + + + public void mark() { + count++; + } + + + public boolean isFull() { + return count == 100; + } + } +}
