Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 69610b053 -> ba9e9dbc7


Updated cursor logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/154d69d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/154d69d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/154d69d8

Branch: refs/heads/two-dot-o-dev
Commit: 154d69d88edd5c458e359e3e7a0b73e3380f29f4
Parents: ff2b3f1
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Apr 27 14:26:00 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Apr 27 16:11:00 2015 -0600

----------------------------------------------------------------------
 .../cursor/AbstractCursorSerializer.java        |  62 ++++++++++++
 .../pipeline/cursor/CursorParseException.java   |  30 ++++++
 .../pipeline/cursor/CursorSerializer.java       |  58 ++---------
 .../pipeline/cursor/CursorSerializerUtil.java   |  49 +++++++++
 .../pipeline/cursor/NoCursorSerializer.java     |  55 ++++++++++
 .../pipeline/cursor/RequestCursor.java          |  69 ++++++++++++-
 .../pipeline/cursor/ResponseCursor.java         |  60 +++++++++--
 .../pipeline/read/AbstractFilter.java           |   7 +-
 ...stractQueryElasticSearchCollectorFilter.java |   5 +-
 .../ElasticsearchCursorSerializer.java          |  42 ++++++++
 .../pipeline/read/entity/EntityIdFilter.java    |  13 ++-
 .../read/entity/EntityLoadCollectorFilter.java  |   8 +-
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |   7 +-
 .../read/graph/AbstractReadGraphFilter.java     |   5 +-
 .../read/graph/EdgeCursorSerializer.java        |  42 ++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |   5 +-
 .../pipeline/cursor/CursorTest.java             | 101 +++++++++++++++++++
 .../persistence/core/rx/OrderedMerge.java       |  27 +++--
 .../persistence/graph/impl/SimpleEdge.java      |  16 ++-
 19 files changed, 562 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
new file mode 100644
index 0000000..23bb99a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.cursor;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+
+/**
+ * Abstract serializer other serializers should inherit from makes 
serialization easier
+ */
+public abstract class AbstractCursorSerializer<T> implements 
CursorSerializer<T> {
+
+
+    /**
+     * Intentionally protected.  Subclasses should be singletons
+     */
+    protected AbstractCursorSerializer() {
+
+    }
+
+
+    @Override
+    public T fromJsonNode( final JsonNode node, final ObjectMapper 
objectMapper ) {
+        try {
+            final Class<? extends T> classType = getType();
+
+            return objectMapper.treeToValue( node, classType );
+        }
+        catch ( JsonProcessingException e ) {
+            throw new CursorParseException( "Unable to deserialize value", e );
+        }
+    }
+
+
+    @Override
+    public JsonNode toNode( final ObjectMapper objectMapper, final T value ) {
+        return objectMapper.valueToTree( value );
+    }
+
+
+    protected abstract Class<? extends T> getType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java
new file mode 100644
index 0000000..b48cce8
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorParseException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.cursor;
+
+
+/**
+ * Thrown when we can't parse a cursor
+ */
+public class CursorParseException extends RuntimeException {
+    public CursorParseException( final String message, final Throwable cause ) 
{
+        super( message, cause );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java
index ce2e10a..0f02265 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializer.java
@@ -20,67 +20,23 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.io.Serializable;
-
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.Base64Variants;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 
 
 /**
- * A utility to serialize objects to/from cursors
+ * Interface for cursor serialization
  */
-public class CursorSerializer {
-
-
-    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
-
-    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY 
);
-
-    private static final Base64Variant VARIANT = 
Base64Variants.MODIFIED_FOR_URL;
-
+public interface CursorSerializer<T> {
 
     /**
-     * Serialize the serializable object as a cursor
+     * convert from a JsonNode to a cursor of type T
      */
-    public static String asCursor( final Serializable cursor ) {
-
-        try {
-            return MAPPER.writer( VARIANT ).writeValueAsString( cursor );
-        }
-        catch ( JsonProcessingException e ) {
-            throw new CursorParseException( "Unable to serialize cursor", e );
-        }
-    }
-
-
-    /**
-     * Deserialize from the cursor
-     * @param cursor
-     * @return
-     * @throws CursorParseException
-     */
-    public <T extends Serializable> T fromCursor( final String cursor, final 
Class<T> cursorClass ) throws CursorParseException {
-        try {
-
-            final JsonParser parser = MAPPER.getFactory().createParser( cursor 
);
-            return MAPPER.reader( VARIANT ).readValue( parser, cursorClass);
-        }
-        catch ( Exception e ) {
-            throw new CursorParseException( "Unable to serialize cursor", e );
-        }
-    }
+    T fromJsonNode( final JsonNode node, final ObjectMapper objectMapper );
 
 
     /**
-     * Thrown when we can't parse a cursor
+     * Convert the cursor to a jsonNode
      */
-    public static class CursorParseException extends RuntimeException {
-        public CursorParseException( final String message, final Throwable 
cause ) {
-            super( message, cause );
-        }
-    }
+    JsonNode toNode( final ObjectMapper objectMapper, final T value );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
new file mode 100644
index 0000000..05c1018
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.cursor;
+
+
+import com.fasterxml.jackson.core.Base64Variant;
+import com.fasterxml.jackson.core.Base64Variants;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+
+
+/**
+ * A utility to serialize objects to/from cursors
+ */
+public class CursorSerializerUtil {
+
+    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
+
+    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY 
);
+
+    private static final Base64Variant VARIANT = 
Base64Variants.MODIFIED_FOR_URL;
+
+
+    public static ObjectMapper getMapper() {
+        return MAPPER;
+    }
+
+
+    public static Base64Variant getBase64() {
+        return VARIANT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
new file mode 100644
index 0000000..1d42df4
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.cursor;
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+
+/**
+ * Interface for cursor serialization
+ *
+ * TODO, the need for this seems to indicate an issue with our object 
composition.  Refactor this away
+ */
+public class NoCursorSerializer<T> implements CursorSerializer<T> {
+
+    private static final NoCursorSerializer<Object> INSTANCE = new 
NoCursorSerializer<>();
+
+
+    @Override
+    public T fromJsonNode( final JsonNode node, final ObjectMapper 
objectMapper ) {
+        return null;
+    }
+
+
+    @Override
+    public JsonNode toNode( final ObjectMapper objectMapper, final T value ) {
+        return objectMapper.createObjectNode();
+    }
+
+
+    /**
+     * convenience for type casting
+     */
+    public static <T> NoCursorSerializer<T> create() {
+        return ( NoCursorSerializer<T> ) INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
index 99cfd74..b117c21 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
@@ -20,9 +20,16 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.io.Serializable;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
+import com.fasterxml.jackson.core.Base64Variant;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 
 /**
@@ -30,16 +37,70 @@ import com.google.common.base.Optional;
  */
 public class RequestCursor {
 
-    public RequestCursor(final Optional<String> cursor){
+    /**
+     * Aritrary number, just meant to keep us from having a DOS issue
+     */
+    private static final int MAX_SIZE = 1024;
+
+    private static final int MAX_CURSOR_COUNT = 100;
+
+    private static final ObjectMapper MAPPER = 
CursorSerializerUtil.getMapper();
+    private static final Base64Variant VARIANT = 
CursorSerializerUtil.getBase64();
+
+    private final Map<Integer, JsonNode> parsedCursor;
+
 
+    public RequestCursor( final Optional<String> cursor ) {
+        if ( cursor.isPresent() ) {
+            parsedCursor = fromCursor( cursor.get() );
+        }
+        else {
+            parsedCursor = Collections.EMPTY_MAP;
+        }
     }
 
 
     /**
      * Get the cursor with the specified id
      */
-    public <T extends Serializable> T getCursor( final int id, final Class<T> 
cursorType ) {
-        return null;
+    public <T> T getCursor( final int id, final CursorSerializer<T> serializer 
) {
+
+        final JsonNode node = parsedCursor.get( id );
+
+        return serializer.fromJsonNode( node, MAPPER );
     }
 
+
+    /**
+     * Deserialize from the cursor as json nodes
+     */
+    private Map<Integer, JsonNode> fromCursor( final String cursor ) throws 
CursorParseException {
+        try {
+
+
+            Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your 
cursor must be less than " + MAX_SIZE + " chars in length");
+
+            final byte[] data = Base64.getUrlDecoder().decode( cursor );
+
+            JsonNode jsonNode = MAPPER.readTree( data );
+
+
+            Preconditions
+                .checkArgument( jsonNode.size() <= MAX_CURSOR_COUNT, " You 
cannot have more than " + MAX_CURSOR_COUNT + " cursors" );
+
+
+            Map<Integer, JsonNode> cursors = new HashMap<>();
+
+            final Iterable<Map.Entry<String, JsonNode>> iterable = () -> 
jsonNode.fields();
+
+            for ( final Map.Entry<String, JsonNode> node : iterable ) {
+                cursors.put( Integer.parseInt( node.getKey() ), 
node.getValue() );
+            }
+
+            return cursors;
+        }
+        catch ( Exception e ) {
+            throw new CursorParseException( "Unable to serialize cursor", e );
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index 0e047ed..e379a34 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -21,9 +21,16 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
 import java.io.Serializable;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.fasterxml.jackson.core.Base64Variant;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 
 /**
  * A cursor used in rendering a response
@@ -31,26 +38,67 @@ import java.util.Map;
 public class ResponseCursor {
 
 
+    private static final ObjectMapper MAPPER = 
CursorSerializerUtil.getMapper();
+    private static final Base64Variant VARIANT = 
CursorSerializerUtil.getBase64();
 
     /**
      * We use a map b/c some indexes might be skipped
      */
-    private Map<Integer, ? super Serializable> cursors = new HashMap<>();
+    private Map<Integer, CursorEntry<?>> cursors = new HashMap<>();
+
 
     /**
      * Set the possible cursor value into the index. DOES NOT parse the 
cursor.  This is intentional for performance
      */
-    public <T extends Serializable> void setCursor( final int id, final T 
cursor ) {
-        cursors.put( id, cursor );
+    public <T extends Serializable> void setCursor( final int id, final T 
cursor,
+                                                    final CursorSerializer<T> 
serializer ) {
+
+        final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer 
);
+        cursors.put( id, newEntry );
     }
 
 
-    private void ensureCapacity() {
+    /**
+     * now we're done, encode as a string
+     */
+    public String encodeAsString() {
+        try {
+            final ObjectNode map = MAPPER.createObjectNode();
+
+            for ( Map.Entry<Integer, CursorEntry<?>> entry : 
cursors.entrySet() ) {
+
+                final CursorEntry cursorEntry = entry.getValue();
+
+                final JsonNode serialized = cursorEntry.serializer.toNode( 
MAPPER, cursorEntry.cursor );
 
+                map.put( entry.getKey().toString(), serialized );
+            }
+
+
+            final byte[] output = MAPPER.writeValueAsBytes(map);
+
+            //generate a base64 url save string
+            return Base64.getUrlEncoder().encodeToString( output );
+//            return MAPPER.writer( VARIANT ).writeValueAsString( map );
+
+        }
+        catch ( JsonProcessingException e ) {
+            throw new CursorParseException( "Unable to serialize cursor", e );
+        }
     }
 
 
-    public String encodeAsString() {
-        return null;
+    /**
+     * Interal pointer to the cursor and it's serialzed value
+     */
+    private static final class CursorEntry<T> {
+        private final T cursor;
+        private final CursorSerializer<T> serializer;
+
+
+        private CursorEntry( final T cursor, final CursorSerializer<T> 
serializer ) {
+            this.cursor = cursor;
+            this.serializer = serializer;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
index a6e7ef1..3564a79 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read;
 
 import java.io.Serializable;
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -75,7 +76,7 @@ public abstract class AbstractFilter<T, C extends 
Serializable> implements Filte
      * Return the parsed value of the cursor from the last request, if it 
exists
      */
     protected Optional<C> getCursor() {
-        final C cursor = readCache.getCursor( id, getCursorClass() );
+        final C cursor = readCache.getCursor( id, getCursorSerializer() );
 
         return Optional.fromNullable( cursor );
     }
@@ -89,7 +90,7 @@ public abstract class AbstractFilter<T, C extends 
Serializable> implements Filte
      * @param newValue
      */
     protected void setCursor(final C newValue){
-        writeCache.setCursor( id, newValue );
+        writeCache.setCursor( id, newValue,  getCursorSerializer() );
     }
 
 
@@ -104,6 +105,6 @@ public abstract class AbstractFilter<T, C extends 
Serializable> implements Filte
     /**
      * Return the class to be used when parsing the cursor
      */
-    protected abstract Class<C> getCursorClass();
+    protected abstract CursorSerializer<C> getCursorSerializer();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
index a843f51..f46a78a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
 
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter;
 import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ElasticSearchQueryExecutor;
@@ -109,8 +110,8 @@ public abstract class 
AbstractQueryElasticSearchCollectorFilter extends Abstract
 
 
     @Override
-    protected Class<Integer> getCursorClass() {
-        return Integer.class;
+    protected CursorSerializer<Integer> getCursorSerializer() {
+        return ElasticsearchCursorSerializer.INSTANCE;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
new file mode 100644
index 0000000..a4e7746
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import 
org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+
+
+/**
+ * ElasticSearch cursor serializer
+ */
+public class ElasticsearchCursorSerializer extends 
AbstractCursorSerializer<Integer> {
+
+
+    public static final ElasticsearchCursorSerializer INSTANCE = new 
ElasticsearchCursorSerializer();
+
+    @Override
+    protected Class<Integer> getType() {
+        return Integer.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
index 5734a5b..8ba5238 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
@@ -22,6 +22,8 @@ package 
org.apache.usergrid.corepersistence.pipeline.read.entity;
 
 import java.io.Serializable;
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -45,15 +47,16 @@ public class EntityIdFilter extends AbstractFilter<Id, 
Serializable> implements
     public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = 
entityId;}
 
 
-    @Override
-    protected Class<Serializable> getCursorClass() {
-        //no op
-        return null;
-    }
 
 
     @Override
     public Observable<Id> call( final Observable<Id> idObservable ) {
         return Observable.just( entityId );
     }
+
+
+    @Override
+    protected CursorSerializer<Serializable> getCursorSerializer() {
+        return NoCursorSerializer.create();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
index 3b140ae..78a9835 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
@@ -23,6 +23,8 @@ package 
org.apache.usergrid.corepersistence.pipeline.read.entity;
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
@@ -60,9 +62,9 @@ public class EntityLoadCollectorFilter extends 
AbstractFilter<Results, Serializa
 
 
     @Override
-    protected Class<Serializable> getCursorClass() {
-        return null;
-    }
+      protected CursorSerializer<Serializable> getCursorSerializer() {
+          return NoCursorSerializer.create();
+      }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index 2e8f041..9f63bd8 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
 import org.apache.usergrid.persistence.graph.GraphManager;
@@ -54,9 +56,8 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends 
AbstractFilter<Id,
 
 
     @Override
-    protected Class<Id> getCursorClass() {
-        //no op
-        return null;
+    protected CursorSerializer<Id> getCursorSerializer() {
+        return NoCursorSerializer.create();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 06aae83..e6da9c2 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -82,8 +83,8 @@ public abstract class AbstractReadGraphFilter extends 
AbstractFilter<Id, Edge> i
 
 
     @Override
-    protected Class<Edge> getCursorClass() {
-        return Edge.class;
+    protected CursorSerializer<Edge> getCursorSerializer() {
+        return EdgeCursorSerializer.INSTANCE;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
new file mode 100644
index 0000000..769a67e
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import 
org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+
+
+/**
+ * Edge cursor serializer
+ */
+public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
+
+
+    public static final EdgeCursorSerializer INSTANCE = new 
EdgeCursorSerializer();
+
+    @Override
+    protected Class<SimpleEdge> getType() {
+        return SimpleEdge.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index d5cdd66..65b02b6 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -89,7 +90,7 @@ public class ReadGraphConnectionByTypeFilter extends 
AbstractFilter<Id, Edge> im
 
 
     @Override
-    protected Class<Edge> getCursorClass() {
-        return Edge.class;
+    protected CursorSerializer<Edge> getCursorSerializer() {
+        return EdgeCursorSerializer.INSTANCE;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
new file mode 100644
index 0000000..fec3e78
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.cursor;
+
+
+
+
+import org.junit.Test;
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+
+import com.google.common.base.Optional;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.*;
+
+
+public class CursorTest {
+
+    @Test
+    public void testCursors(){
+
+        ResponseCursor responseCursor = new ResponseCursor();
+
+
+        //test encoding edge
+
+        final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1",  
createId("target1"), 100  );
+
+
+        final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2",  
createId("target2"), 110  );
+
+
+
+        final Integer query1 = 10;
+
+        final Integer query2 = 20;
+
+
+        responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE );
+
+        responseCursor.setCursor( 1, query1, 
ElasticsearchCursorSerializer.INSTANCE );
+
+        responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE);
+
+        responseCursor.setCursor(3, query2, 
ElasticsearchCursorSerializer.INSTANCE);
+
+        final String cursor = responseCursor.encodeAsString();
+
+
+
+        //now parse it
+
+        final RequestCursor requestCursor = new RequestCursor( Optional.of( 
cursor ) );
+
+        //get everything else out.  We reversed the order for because we can, 
order shouldn't matter.
+
+
+
+
+        final Integer parsedQuery2 = requestCursor.getCursor( 3, 
ElasticsearchCursorSerializer.INSTANCE );
+
+        assertEquals(query2, parsedQuery2);
+
+        final Edge parsedEdge2 = requestCursor.getCursor( 2, 
EdgeCursorSerializer.INSTANCE );
+
+        assertEquals( edge2, parsedEdge2 );
+
+        final Integer parsedQuery1 = requestCursor.getCursor( 1, 
ElasticsearchCursorSerializer.INSTANCE );
+
+        assertEquals( query1, parsedQuery1 );
+
+
+        final Edge parsedEdge1 = requestCursor.getCursor( 0, 
EdgeCursorSerializer.INSTANCE );
+
+        assertEquals(edge1, parsedEdge1);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
index 613585e..6600b3e 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
@@ -94,20 +94,19 @@ public final class OrderedMerge<T> implements 
Observable.OnSubscribe<T> {
 
             innerObservers[i] = inner;
         }
-// TODO, if this merge makes it into 2.0-dev remove this and use 2.0 -dev
-//        /**
-//         * Once we're set up, begin the subscription to sub observables
-//         */
-//        for ( int i = 0; i < observables.length; i++ ) {
-//            //subscribe after setting them up
-//            //add our subscription to the composite for future cancellation
-//            Subscription subscription = observables[i].subscribe( 
innerObservers[i] );
-//
-//            csub.add( subscription );
-//
-//            //add the internal composite subscription
-//            outerOperation.add( csub );
-//        }
+        /**
+         * Once we're set up, begin the subscription to sub observables
+         */
+        for ( int i = 0; i < observables.length; i++ ) {
+            //subscribe after setting them up
+            //add our subscription to the composite for future cancellation
+            Subscription subscription = observables[i].subscribe( 
innerObservers[i] );
+
+            csub.add( subscription );
+
+            //add the internal composite subscription
+            outerOperation.add( csub );
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/154d69d8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
index 8579579..a89cd96 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
@@ -31,12 +31,20 @@ import org.apache.usergrid.persistence.model.entity.Id;
  */
 public class SimpleEdge implements Edge {
 
-    protected final Id sourceNode;
-    protected final String type;
-    protected final Id targetNode;
-    protected final long timestamp;
+    protected Id sourceNode;
+    protected String type;
+    protected Id targetNode;
+    protected long timestamp;
 
 
+    /**
+     * Used for SMILE.  Do not remove
+     */
+    @SuppressWarnings( "unused" )
+    public SimpleEdge(){
+
+    }
+
     public SimpleEdge( final Id sourceNode, final String type, final Id 
targetNode, final long timestamp ) {
         this.sourceNode = sourceNode;
         this.type = type;

Reply via email to