Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 69610b053 -> ba9e9dbc7
Updated cursor logic Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/154d69d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/154d69d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/154d69d8 Branch: refs/heads/two-dot-o-dev Commit: 154d69d88edd5c458e359e3e7a0b73e3380f29f4 Parents: ff2b3f1 Author: Todd Nine <tn...@apigee.com> Authored: Mon Apr 27 14:26:00 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Apr 27 16:11:00 2015 -0600 ---------------------------------------------------------------------- .../cursor/AbstractCursorSerializer.java | 62 ++++++++++++ .../pipeline/cursor/CursorParseException.java | 30 ++++++ .../pipeline/cursor/CursorSerializer.java | 58 ++--------- .../pipeline/cursor/CursorSerializerUtil.java | 49 +++++++++ .../pipeline/cursor/NoCursorSerializer.java | 55 ++++++++++ .../pipeline/cursor/RequestCursor.java | 69 ++++++++++++- .../pipeline/cursor/ResponseCursor.java | 60 +++++++++-- .../pipeline/read/AbstractFilter.java | 7 +- ...stractQueryElasticSearchCollectorFilter.java | 5 +- .../ElasticsearchCursorSerializer.java | 42 ++++++++ .../pipeline/read/entity/EntityIdFilter.java | 13 ++- .../read/entity/EntityLoadCollectorFilter.java | 8 +- .../graph/AbstractReadGraphEdgeByIdFilter.java | 7 +- .../read/graph/AbstractReadGraphFilter.java | 5 +- .../read/graph/EdgeCursorSerializer.java | 42 ++++++++ .../graph/ReadGraphConnectionByTypeFilter.java | 5 +- .../pipeline/cursor/CursorTest.java | 101 +++++++++++++++++++ .../persistence/core/rx/OrderedMerge.java | 27 +++-- .../persistence/graph/impl/SimpleEdge.java | 16 ++- 19 files changed, 562 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java new file mode 100644 index 0000000..23bb99a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.cursor; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + + +/** + * Abstract serializer other serializers should inherit from makes serialization easier + */ +public abstract class AbstractCursorSerializer<T> implements CursorSerializer<T> { + + + /** + * Intentionally protected. Subclasses should be singletons + */ + protected AbstractCursorSerializer() { + + } + + + @Override + public T fromJsonNode( final JsonNode node, final ObjectMapper objectMapper ) { + try { + final Class<? extends T> classType = getType(); + + return objectMapper.treeToValue( node, classType ); + } + catch ( JsonProcessingException e ) { + throw new CursorParseException( "Unable to deserialize value", e ); + } + } + + + @Override + public JsonNode toNode( final ObjectMapper objectMapper, final T value ) { + return objectMapper.valueToTree( value ); + } + + + protected abstract Class<? extends T> getType(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java new file mode 100644 index 0000000..b48cce8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.cursor; + + +/** + * Thrown when we can't parse a cursor + */ +public class CursorParseException extends RuntimeException { + public CursorParseException( final String message, final Throwable cause ) { + super( message, cause ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java index ce2e10a..0f02265 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java @@ -20,67 +20,23 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; -import java.io.Serializable; - -import com.fasterxml.jackson.core.Base64Variant; -import com.fasterxml.jackson.core.Base64Variants; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; /** - * A utility to serialize objects to/from cursors + * Interface for cursor serialization */ -public class CursorSerializer { - - - private static final SmileFactory SMILE_FACTORY = new SmileFactory(); - - private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); - - private static final Base64Variant VARIANT = Base64Variants.MODIFIED_FOR_URL; - +public interface CursorSerializer<T> { /** - * Serialize the serializable object as a cursor + * convert from a JsonNode to a cursor of type T */ - public static String asCursor( final Serializable cursor ) { - - try { - return MAPPER.writer( VARIANT ).writeValueAsString( cursor ); - } - catch ( JsonProcessingException e ) { - throw new CursorParseException( "Unable to serialize cursor", e ); - } - } - - - /** - * Deserialize from the cursor - * @param cursor - * @return - * @throws CursorParseException - */ - public <T extends Serializable> T fromCursor( final String cursor, final Class<T> cursorClass ) throws CursorParseException { - try { - - final JsonParser parser = MAPPER.getFactory().createParser( cursor ); - return MAPPER.reader( VARIANT ).readValue( parser, cursorClass); - } - catch ( Exception e ) { - throw new CursorParseException( "Unable to serialize cursor", e ); - } - } + T fromJsonNode( final JsonNode node, final ObjectMapper objectMapper ); /** - * Thrown when we can't parse a cursor + * Convert the cursor to a jsonNode */ - public static class CursorParseException extends RuntimeException { - public CursorParseException( final String message, final Throwable cause ) { - super( message, cause ); - } - } + JsonNode toNode( final ObjectMapper objectMapper, final T value ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java new file mode 100644 index 0000000..05c1018 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.cursor; + + +import com.fasterxml.jackson.core.Base64Variant; +import com.fasterxml.jackson.core.Base64Variants; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; + + +/** + * A utility to serialize objects to/from cursors + */ +public class CursorSerializerUtil { + + private static final SmileFactory SMILE_FACTORY = new SmileFactory(); + + private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); + + private static final Base64Variant VARIANT = Base64Variants.MODIFIED_FOR_URL; + + + public static ObjectMapper getMapper() { + return MAPPER; + } + + + public static Base64Variant getBase64() { + return VARIANT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java new file mode 100644 index 0000000..1d42df4 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.cursor; + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + + +/** + * Interface for cursor serialization + * + * TODO, the need for this seems to indicate an issue with our object composition. Refactor this away + */ +public class NoCursorSerializer<T> implements CursorSerializer<T> { + + private static final NoCursorSerializer<Object> INSTANCE = new NoCursorSerializer<>(); + + + @Override + public T fromJsonNode( final JsonNode node, final ObjectMapper objectMapper ) { + return null; + } + + + @Override + public JsonNode toNode( final ObjectMapper objectMapper, final T value ) { + return objectMapper.createObjectNode(); + } + + + /** + * convenience for type casting + */ + public static <T> NoCursorSerializer<T> create() { + return ( NoCursorSerializer<T> ) INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java index 99cfd74..b117c21 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java @@ -20,9 +20,16 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; -import java.io.Serializable; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import com.fasterxml.jackson.core.Base64Variant; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; /** @@ -30,16 +37,70 @@ import com.google.common.base.Optional; */ public class RequestCursor { - public RequestCursor(final Optional<String> cursor){ + /** + * Aritrary number, just meant to keep us from having a DOS issue + */ + private static final int MAX_SIZE = 1024; + + private static final int MAX_CURSOR_COUNT = 100; + + private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper(); + private static final Base64Variant VARIANT = CursorSerializerUtil.getBase64(); + + private final Map<Integer, JsonNode> parsedCursor; + + public RequestCursor( final Optional<String> cursor ) { + if ( cursor.isPresent() ) { + parsedCursor = fromCursor( cursor.get() ); + } + else { + parsedCursor = Collections.EMPTY_MAP; + } } /** * Get the cursor with the specified id */ - public <T extends Serializable> T getCursor( final int id, final Class<T> cursorType ) { - return null; + public <T> T getCursor( final int id, final CursorSerializer<T> serializer ) { + + final JsonNode node = parsedCursor.get( id ); + + return serializer.fromJsonNode( node, MAPPER ); } + + /** + * Deserialize from the cursor as json nodes + */ + private Map<Integer, JsonNode> fromCursor( final String cursor ) throws CursorParseException { + try { + + + Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your cursor must be less than " + MAX_SIZE + " chars in length"); + + final byte[] data = Base64.getUrlDecoder().decode( cursor ); + + JsonNode jsonNode = MAPPER.readTree( data ); + + + Preconditions + .checkArgument( jsonNode.size() <= MAX_CURSOR_COUNT, " You cannot have more than " + MAX_CURSOR_COUNT + " cursors" ); + + + Map<Integer, JsonNode> cursors = new HashMap<>(); + + final Iterable<Map.Entry<String, JsonNode>> iterable = () -> jsonNode.fields(); + + for ( final Map.Entry<String, JsonNode> node : iterable ) { + cursors.put( Integer.parseInt( node.getKey() ), node.getValue() ); + } + + return cursors; + } + catch ( Exception e ) { + throw new CursorParseException( "Unable to serialize cursor", e ); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java index 0e047ed..e379a34 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java @@ -21,9 +21,16 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; import java.io.Serializable; +import java.util.Base64; import java.util.HashMap; import java.util.Map; +import com.fasterxml.jackson.core.Base64Variant; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + /** * A cursor used in rendering a response @@ -31,26 +38,67 @@ import java.util.Map; public class ResponseCursor { + private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper(); + private static final Base64Variant VARIANT = CursorSerializerUtil.getBase64(); /** * We use a map b/c some indexes might be skipped */ - private Map<Integer, ? super Serializable> cursors = new HashMap<>(); + private Map<Integer, CursorEntry<?>> cursors = new HashMap<>(); + /** * Set the possible cursor value into the index. DOES NOT parse the cursor. This is intentional for performance */ - public <T extends Serializable> void setCursor( final int id, final T cursor ) { - cursors.put( id, cursor ); + public <T extends Serializable> void setCursor( final int id, final T cursor, + final CursorSerializer<T> serializer ) { + + final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer ); + cursors.put( id, newEntry ); } - private void ensureCapacity() { + /** + * now we're done, encode as a string + */ + public String encodeAsString() { + try { + final ObjectNode map = MAPPER.createObjectNode(); + + for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) { + + final CursorEntry cursorEntry = entry.getValue(); + + final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor ); + map.put( entry.getKey().toString(), serialized ); + } + + + final byte[] output = MAPPER.writeValueAsBytes(map); + + //generate a base64 url save string + return Base64.getUrlEncoder().encodeToString( output ); +// return MAPPER.writer( VARIANT ).writeValueAsString( map ); + + } + catch ( JsonProcessingException e ) { + throw new CursorParseException( "Unable to serialize cursor", e ); + } } - public String encodeAsString() { - return null; + /** + * Interal pointer to the cursor and it's serialzed value + */ + private static final class CursorEntry<T> { + private final T cursor; + private final CursorSerializer<T> serializer; + + + private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) { + this.cursor = cursor; + this.serializer = serializer; + } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java index a6e7ef1..3564a79 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java @@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read; import java.io.Serializable; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -75,7 +76,7 @@ public abstract class AbstractFilter<T, C extends Serializable> implements Filte * Return the parsed value of the cursor from the last request, if it exists */ protected Optional<C> getCursor() { - final C cursor = readCache.getCursor( id, getCursorClass() ); + final C cursor = readCache.getCursor( id, getCursorSerializer() ); return Optional.fromNullable( cursor ); } @@ -89,7 +90,7 @@ public abstract class AbstractFilter<T, C extends Serializable> implements Filte * @param newValue */ protected void setCursor(final C newValue){ - writeCache.setCursor( id, newValue ); + writeCache.setCursor( id, newValue, getCursorSerializer() ); } @@ -104,6 +105,6 @@ public abstract class AbstractFilter<T, C extends Serializable> implements Filte /** * Return the class to be used when parsing the cursor */ - protected abstract Class<C> getCursorClass(); + protected abstract CursorSerializer<C> getCursorSerializer(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java index a843f51..f46a78a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ElasticSearchQueryExecutor; @@ -109,8 +110,8 @@ public abstract class AbstractQueryElasticSearchCollectorFilter extends Abstract @Override - protected Class<Integer> getCursorClass() { - return Integer.class; + protected CursorSerializer<Integer> getCursorSerializer() { + return ElasticsearchCursorSerializer.INSTANCE; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java new file mode 100644 index 0000000..a4e7746 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; + + +/** + * ElasticSearch cursor serializer + */ +public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> { + + + public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer(); + + @Override + protected Class<Integer> getType() { + return Integer.class; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java index 5734a5b..8ba5238 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java @@ -22,6 +22,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.entity; import java.io.Serializable; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; import org.apache.usergrid.persistence.model.entity.Id; @@ -45,15 +47,16 @@ public class EntityIdFilter extends AbstractFilter<Id, Serializable> implements public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;} - @Override - protected Class<Serializable> getCursorClass() { - //no op - return null; - } @Override public Observable<Id> call( final Observable<Id> idObservable ) { return Observable.just( entityId ); } + + + @Override + protected CursorSerializer<Serializable> getCursorSerializer() { + return NoCursorSerializer.create(); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java index 3b140ae..78a9835 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java @@ -23,6 +23,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.entity; import java.io.Serializable; import java.util.Map; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; @@ -60,9 +62,9 @@ public class EntityLoadCollectorFilter extends AbstractFilter<Results, Serializa @Override - protected Class<Serializable> getCursorClass() { - return null; - } + protected CursorSerializer<Serializable> getCursorSerializer() { + return NoCursorSerializer.create(); + } @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java index 2e8f041..9f63bd8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; import org.apache.usergrid.persistence.graph.GraphManager; @@ -54,9 +56,8 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, @Override - protected Class<Id> getCursorClass() { - //no op - return null; + protected CursorSerializer<Id> getCursorSerializer() { + return NoCursorSerializer.create(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java index 06aae83..e6da9c2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; import org.apache.usergrid.persistence.graph.Edge; @@ -82,8 +83,8 @@ public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> i @Override - protected Class<Edge> getCursorClass() { - return Edge.class; + protected CursorSerializer<Edge> getCursorSerializer() { + return EdgeCursorSerializer.INSTANCE; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java new file mode 100644 index 0000000..769a67e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.graph; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; + + +/** + * Edge cursor serializer + */ +public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> { + + + public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer(); + + @Override + protected Class<SimpleEdge> getType() { + return SimpleEdge.class; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java index d5cdd66..65b02b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; import org.apache.usergrid.persistence.graph.Edge; @@ -89,7 +90,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> im @Override - protected Class<Edge> getCursorClass() { - return Edge.class; + protected CursorSerializer<Edge> getCursorSerializer() { + return EdgeCursorSerializer.INSTANCE; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java new file mode 100644 index 0000000..fec3e78 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.cursor; + + + + +import org.junit.Test; + +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; + +import com.google.common.base.Optional; + +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.*; + + +public class CursorTest { + + @Test + public void testCursors(){ + + ResponseCursor responseCursor = new ResponseCursor(); + + + //test encoding edge + + final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1", createId("target1"), 100 ); + + + final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2", createId("target2"), 110 ); + + + + final Integer query1 = 10; + + final Integer query2 = 20; + + + responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE ); + + responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE ); + + responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE); + + responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE); + + final String cursor = responseCursor.encodeAsString(); + + + + //now parse it + + final RequestCursor requestCursor = new RequestCursor( Optional.of( cursor ) ); + + //get everything else out. We reversed the order for because we can, order shouldn't matter. + + + + + final Integer parsedQuery2 = requestCursor.getCursor( 3, ElasticsearchCursorSerializer.INSTANCE ); + + assertEquals(query2, parsedQuery2); + + final Edge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE ); + + assertEquals( edge2, parsedEdge2 ); + + final Integer parsedQuery1 = requestCursor.getCursor( 1, ElasticsearchCursorSerializer.INSTANCE ); + + assertEquals( query1, parsedQuery1 ); + + + final Edge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE ); + + assertEquals(edge1, parsedEdge1); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java index 613585e..6600b3e 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java @@ -94,20 +94,19 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> { innerObservers[i] = inner; } -// TODO, if this merge makes it into 2.0-dev remove this and use 2.0 -dev -// /** -// * Once we're set up, begin the subscription to sub observables -// */ -// for ( int i = 0; i < observables.length; i++ ) { -// //subscribe after setting them up -// //add our subscription to the composite for future cancellation -// Subscription subscription = observables[i].subscribe( innerObservers[i] ); -// -// csub.add( subscription ); -// -// //add the internal composite subscription -// outerOperation.add( csub ); -// } + /** + * Once we're set up, begin the subscription to sub observables + */ + for ( int i = 0; i < observables.length; i++ ) { + //subscribe after setting them up + //add our subscription to the composite for future cancellation + Subscription subscription = observables[i].subscribe( innerObservers[i] ); + + csub.add( subscription ); + + //add the internal composite subscription + outerOperation.add( csub ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java index 8579579..a89cd96 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java @@ -31,12 +31,20 @@ import org.apache.usergrid.persistence.model.entity.Id; */ public class SimpleEdge implements Edge { - protected final Id sourceNode; - protected final String type; - protected final Id targetNode; - protected final long timestamp; + protected Id sourceNode; + protected String type; + protected Id targetNode; + protected long timestamp; + /** + * Used for SMILE. Do not remove + */ + @SuppressWarnings( "unused" ) + public SimpleEdge(){ + + } + public SimpleEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp ) { this.sourceNode = sourceNode; this.type = type;