Finishes changes before tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/20c9b350 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/20c9b350 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/20c9b350 Branch: refs/heads/USERGRID-608 Commit: 20c9b3509cf96a6ecab1a45a2c572fd6a041e00d Parents: cb179d3 Author: Todd Nine <tn...@apigee.com> Authored: Thu May 14 17:19:27 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Thu May 14 17:19:27 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/InMemoryAsyncEventService.java | 2 - .../index/EdgeScopeSerializer.java | 41 ++++ .../index/IndexProcessorFig.java | 6 +- .../corepersistence/index/ReIndexService.java | 75 +++--- .../index/ReIndexServiceImpl.java | 226 +++++++++++++++---- .../pipeline/cursor/CursorSerializerUtil.java | 54 ++++- .../pipeline/cursor/RequestCursor.java | 9 +- .../pipeline/cursor/ResponseCursor.java | 49 ++-- .../pipeline/read/AbstractPathFilter.java | 30 --- .../pipeline/read/CursorSeek.java | 53 +++++ .../rx/impl/AllEntityIdsObservable.java | 4 +- .../rx/impl/AllEntityIdsObservableImpl.java | 5 +- .../PerformanceEntityRebuildIndexTest.java | 5 +- .../graph/serialization/EdgesObservable.java | 21 +- .../serialization/impl/EdgesObservableImpl.java | 4 +- 15 files changed, 422 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index 96966bf..ddcf826 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -99,8 +99,6 @@ public class InMemoryAsyncEventService implements AsyncEventService { @Override public void index( final EntityIndexOperation entityIndexOperation ) { - - run(eventBuilder.index( entityIndexOperation )); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java new file mode 100644 index 0000000..2a6a5ac --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; + + +/** + * Serialize our edge scope for cursors + */ +public class EdgeScopeSerializer extends AbstractCursorSerializer<EdgeScope> { + + + public static final EdgeScopeSerializer INSTANCE = new EdgeScopeSerializer(); + + @Override + protected Class<EdgeScope> getType() { + return EdgeScope.class; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index fe9d3fd..8e835e2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -78,9 +78,9 @@ public interface IndexProcessorFig extends GuicyFig { String getQueueImplementation(); - @Default("30000") - @Key("elasticsearch.reindex.sample.interval") - long getReIndexSampleInterval(); + @Default("10000") + @Key("elasticsearch.reindex.flush.interval") + int getUpdateInterval(); @Default("false") http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java index b25eca5..f8955dd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -20,22 +20,6 @@ package org.apache.usergrid.corepersistence.index; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; -import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - -import com.google.common.base.Optional; - -import rx.Observable; -import rx.Observer; -import rx.observables.ConnectableObservable; - - /** * An interface for re-indexing all entities in an application */ @@ -46,48 +30,75 @@ public interface ReIndexService { * Perform an index rebuild * * @param indexServiceRequestBuilder The builder to build the request - * @return */ - IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder); + IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ); /** * Generate a build for the index - * @return */ IndexServiceRequestBuilder getBuilder(); + + /** + * Get the status of a job + * @param jobId The jobId returned during the rebuild index + * @return + */ + IndexResponse getStatus( final String jobId ); + + /** * The response when requesting a re-index operation */ class IndexResponse { - final String cursor; - final ConnectableObservable<EdgeScope> indexedEdgecount; + final String jobId; + final String status; + final long numberProcessed; + final long lastUpdated; + + + public IndexResponse( final String jobId, final String status, final long numberProcessed, + final long lastUpdated ) { + this.jobId = jobId; + this.status = status; + this.numberProcessed = numberProcessed; + this.lastUpdated = lastUpdated; + } - public IndexResponse( final String cursor, final ConnectableObservable<EdgeScope> indexedEdgecount ) { - this.cursor = cursor; - this.indexedEdgecount = indexedEdgecount; + /** + * Get the jobId used to resume this operation + */ + public String getJobId() { + return jobId; + } + + + /** + * Get the last updated time, as a long + * @return + */ + public long getLastUpdated() { + return lastUpdated; } /** - * Get the cursor used to resume this operation + * Get the number of records processed * @return */ - public String getCursor() { - return cursor; + public long getNumberProcessed() { + return numberProcessed; } /** - * Return the observable of all edges to be indexed. - * - * Note that after subscribing "connect" will need to be called to ensure that processing begins + * Get the status * @return */ - public ConnectableObservable<EdgeScope> getCount() { - return indexedEdgecount; + public String getStatus() { + return status; } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index a2fa09a..d828fc2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -20,29 +20,32 @@ package org.apache.usergrid.corepersistence.index; -import java.util.concurrent.TimeUnit; +import java.util.List; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil; +import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek; import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.StringUtils; +import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; import rx.Observable; -import rx.observables.ConnectableObservable; +import rx.schedulers.Schedulers; @Singleton @@ -51,14 +54,18 @@ public class ReIndexServiceImpl implements ReIndexService { private static final MapScope RESUME_MAP_SCOPE = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" ); - //Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway. + //Keep cursors to resume re-index for 10 days. This is far beyond it's useful real world implications anyway. private static final int INDEX_TTL = 60 * 60 * 24 * 10; + private static final String MAP_CURSOR_KEY = "cursor"; + private static final String MAP_COUNT_KEY = "count"; + private static final String MAP_STATUS_KEY = "status"; + private static final String MAP_UPDATED_KEY = "lastUpdated"; + private final AllApplicationsObservable allApplicationsObservable; private final AllEntityIdsObservable allEntityIdsObservable; private final IndexProcessorFig indexProcessorFig; - private final RxTaskScheduler rxTaskScheduler; private final MapManager mapManager; private final AsyncEventService indexService; @@ -66,69 +73,61 @@ public class ReIndexServiceImpl implements ReIndexService { @Inject public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable, final MapManagerFactory mapManagerFactory, - final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig, - final RxTaskScheduler rxTaskScheduler, final AsyncEventService indexService ) { + final AllApplicationsObservable allApplicationsObservable, + final IndexProcessorFig indexProcessorFig, final AsyncEventService indexService ) { this.allEntityIdsObservable = allEntityIdsObservable; this.allApplicationsObservable = allApplicationsObservable; this.indexProcessorFig = indexProcessorFig; - this.rxTaskScheduler = rxTaskScheduler; this.indexService = indexService; this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE ); } - - - @Override public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) { - //load our last emitted Scope if a cursor is present - if ( indexServiceRequestBuilder.getCursor().isPresent() ) { - throw new UnsupportedOperationException( "Build this" ); - } + //load our last emitted Scope if a cursor is present + final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor() ); + + + final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor ); final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope(); - final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData(); + Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(), + "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" ); + + final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId ); - final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE ); //create an observable that loads each entity and indexes it, start it running with publish - final ConnectableObservable<EdgeScope> runningReIndex = - allEntityIdsObservable.getEdgesToEntities( applicationScopes, - indexServiceRequestBuilder.getCollectionName() ) - - //for each edge, create our scope and index on it - .doOnNext( edge -> indexService.index( - new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), - modifiedSince ) ) ).publish(); + final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes, + indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() ) + //for each edge, create our scope and index on it + .doOnNext( edge -> indexService.index( + new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), + modifiedSince ) ) ); //start our sampler and state persistence //take a sample every sample interval to allow us to resume state with minimal loss - runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, - rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( edge -> { - -// final String serializedState = SerializableMapper.asString( edge ); -// -// mapManager.putString( newCursor, serializedState, INDEX_TTL ); - } ).subscribe(); + runningReIndex.buffer( indexProcessorFig.getUpdateInterval() ) + //create our flushing collector and flush the edge scopes to it + .collect( () -> new FlushingCollector( jobId ), + ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() ) + //subscribe on our I/O scheduler and run the task + .subscribeOn( Schedulers.io() ).subscribe(); - //start pushing to both - runningReIndex.connect(); - - - return new IndexResponse( newCursor, runningReIndex ); + return new IndexResponse( jobId, "Started", 0, 0 ); } @@ -136,6 +135,155 @@ public class ReIndexServiceImpl implements ReIndexService { public IndexServiceRequestBuilder getBuilder() { return new IndexServiceRequestBuilderImpl(); } + + + @Override + public IndexResponse getStatus( final String jobId ) { + Preconditions.checkNotNull( jobId, "jobId must not be null" ); + return getIndexResponse( jobId ); + } + + + /** + * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete + */ + private class FlushingCollector { + + private final String jobId; + private long count; + + + private FlushingCollector( final String jobId ) { + this.jobId = jobId; + } + + + public void flushBuffer( final List<EdgeScope> buffer ) { + count += buffer.size(); + + //write our cursor state + if ( buffer.size() > 0 ) { + writeCursorState( jobId, buffer.get( buffer.size() - 1 ) ); + } + + writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() ); + } + + public void complete(){ + writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() ); + } + } + + + /** + * Get the resume edge scope + * + * @param edgeScope The optional edge scope from the cursor + */ + private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) { + + + if ( edgeScope.isPresent() ) { + return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) ); + } + + return new CursorSeek<>( Optional.absent() ); + } + + + /** + * Generate an observable for our appliation scope + */ + private Observable<ApplicationScope> getApplications( final Optional<EdgeScope> cursor, + final Optional<ApplicationScope> appId ) { + //cursor is present use it and skip until we hit that app + if ( cursor.isPresent() ) { + + final EdgeScope cursorValue = cursor.get(); + //we have a cursor and an application scope that was used. + return allApplicationsObservable.getData().skipWhile( + applicationScope -> !cursorValue.getApplicationScope().equals( applicationScope ) ); + } + //this is intentional. If + else if ( appId.isPresent() ) { + return Observable.just( appId.get() ); + } + + return allApplicationsObservable.getData(); + } + + + /** + * Swap our cursor for an optional edgescope + */ + private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) { + + if ( !cursor.isPresent() ) { + return Optional.absent(); + } + + //get our cursor + final String persistedCursor = mapManager.getString( cursor.get() ); + + if ( persistedCursor == null ) { + return Optional.absent(); + } + + final JsonNode node = CursorSerializerUtil.fromString( persistedCursor ); + + final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() ); + + return Optional.of( edgeScope ); + } + + + /** + * Write the cursor state to the map in cassandra + */ + private void writeCursorState( final String jobId, final EdgeScope edge ) { + + final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge ); + + final String serializedState = CursorSerializerUtil.asString( node ); + + mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, INDEX_TTL ); + } + + + /** + * Write our state meta data into cassandra so everyone can see it + * @param jobId + * @param status + * @param processedCount + * @param lastUpdated + */ + private void writeStateMeta( final String jobId, final String status, final long processedCount, + final long lastUpdated ) { + + mapManager.putString( jobId + MAP_STATUS_KEY, status ); + mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount ); + mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated ); + } + + + /** + * Get the index response from the jobId + * @param jobId + * @return + */ + private IndexResponse getIndexResponse( final String jobId ) { + + final String status = mapManager.getString( jobId+MAP_STATUS_KEY ); + + if(status == null){ + throw new IllegalArgumentException( "Could not find a job with id " + jobId ); + } + + final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY ); + final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY ); + + return new IndexResponse( jobId, status, processedCount, lastUpdated ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java index fea0364..7acdd00 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java @@ -20,10 +20,15 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; -import com.fasterxml.jackson.core.Base64Variant; -import com.fasterxml.jackson.core.Base64Variants; +import java.io.IOException; +import java.util.Base64; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Preconditions; /** @@ -35,9 +40,54 @@ public class CursorSerializerUtil { private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); + /** + * Aritrary number, just meant to keep us from having a DOS issue + */ + private static final int MAX_SIZE = 1024; + public static ObjectMapper getMapper() { return MAPPER; } + + /** + * Turn the json node in to a base64 encoded SMILE binary + */ + public static String asString( final JsonNode node ) { + final byte[] output; + try { + output = MAPPER.writeValueAsBytes( node ); + } + catch ( JsonProcessingException e ) { + throw new RuntimeException( "Unable to create output from json node " + node ); + } + + //generate a base64 url save string + final String value = Base64.getUrlEncoder().encodeToString( output ); + + return value; + } + + + /** + * Parse the base64 encoded binary string + */ + public static JsonNode fromString( final String base64EncodedJson ) { + + Preconditions.checkArgument( base64EncodedJson.length() <= MAX_SIZE, + "Your cursor must be less than " + MAX_SIZE + " chars in length" ); + + final byte[] data = Base64.getUrlDecoder().decode( base64EncodedJson ); + + JsonNode jsonNode; + try { + jsonNode = MAPPER.readTree( data ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to parse json node from string " + base64EncodedJson ); + } + + return jsonNode; + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java index 870edbb..dc6ae71 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java @@ -37,10 +37,6 @@ import com.google.common.base.Preconditions; */ public class RequestCursor { - /** - * Aritrary number, just meant to keep us from having a DOS issue - */ - private static final int MAX_SIZE = 1024; private static final int MAX_CURSOR_COUNT = 100; @@ -83,11 +79,8 @@ public class RequestCursor { try { - Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your cursor must be less than " + MAX_SIZE + " chars in length"); - - final byte[] data = Base64.getUrlDecoder().decode( cursor ); - JsonNode jsonNode = MAPPER.readTree( data ); + JsonNode jsonNode = CursorSerializerUtil.fromString( cursor ); Preconditions http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java index dbd8b88..dc4bf39 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java @@ -20,11 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.cursor; -import java.util.Base64; - import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -53,8 +50,8 @@ public class ResponseCursor { /** - * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to take the - * time to calculate it + * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to + * take the time to calculate it */ public Optional<String> encodeAsString() { @@ -68,42 +65,34 @@ public class ResponseCursor { return encodedValue; } + //no edge path, short circuit - try { - - //no edge path, short circuit - - final ObjectNode map = MAPPER.createObjectNode(); + final ObjectNode map = MAPPER.createObjectNode(); - Optional<EdgePath> current = edgePath; + Optional<EdgePath> current = edgePath; - //traverse each edge and add them to our json - do { + //traverse each edge and add them to our json + do { - final EdgePath edgePath = current.get(); - final Object cursorValue = edgePath.getCursorValue(); - final CursorSerializer serializer = edgePath.getSerializer(); - final int filterId = edgePath.getFilterId(); + final EdgePath edgePath = current.get(); + final Object cursorValue = edgePath.getCursorValue(); + final CursorSerializer serializer = edgePath.getSerializer(); + final int filterId = edgePath.getFilterId(); - final JsonNode serialized = serializer.toNode( MAPPER, cursorValue ); - map.put( String.valueOf( filterId ), serialized ); + final JsonNode serialized = serializer.toNode( MAPPER, cursorValue ); + map.put( String.valueOf( filterId ), serialized ); - current = current.get().getPrevious(); - } - while ( current.isPresent() ); + current = current.get().getPrevious(); + } + while ( current.isPresent() ); - final byte[] output = MAPPER.writeValueAsBytes( map ); + //generate a base64 url save string + final String value = CursorSerializerUtil.asString( map ); - //generate a base64 url save string - final String value = Base64.getUrlEncoder().encodeToString( output ); + encodedValue = Optional.of( value ); - encodedValue = Optional.of( value ); - } - catch ( JsonProcessingException e ) { - throw new CursorParseException( "Unable to serialize cursor", e ); - } return encodedValue; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java index c68dc4a..0f9ac9b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java @@ -76,34 +76,4 @@ public abstract class AbstractPathFilter<T, R, C extends Serializable> extends A * Return the class to be used when parsing the cursor */ protected abstract CursorSerializer<C> getCursorSerializer(); - - - /** - * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values. - * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation - */ - protected static class CursorSeek<C> { - - private Optional<C> seek; - - private CursorSeek(final Optional<C> cursorValue){ - seek = cursorValue; - } - - - /** - * Get the seek value to use when searching - * @return - */ - public Optional<C> getSeekValue(){ - final Optional<C> toReturn = seek; - - seek = Optional.absent(); - - return toReturn; - } - - - - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java new file mode 100644 index 0000000..b803658 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import com.google.common.base.Optional; + + +/** + * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values. + * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation + */ +public class CursorSeek<C> { + + private Optional<C> seek; + + public CursorSeek( final Optional<C> cursorValue ){ + seek = cursorValue; + } + + + /** + * Get the seek value to use when searching + * @return + */ + public Optional<C> getSeekValue(){ + final Optional<C> toReturn = seek; + + seek = Optional.absent(); + + return toReturn; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java index aada240..9070609 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java @@ -24,6 +24,7 @@ import com.google.common.base.Optional; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; import rx.Observable; @@ -44,8 +45,9 @@ public interface AllEntityIdsObservable { * Get all edges that represent edges to entities in the system * @param appScopes * @param edgeType The edge type to use (if specified) + * @param lastEdge The edge to resume processing from * @return */ - Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType); + Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java index 6a95e7b..0420a32 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java @@ -28,6 +28,7 @@ import com.google.inject.Singleton; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; @@ -81,12 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable { @Override - public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) { + public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) { return appScopes.flatMap( applicationScope -> { final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType ) + return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge ) .map( edge -> new EdgeScope(applicationScope, edge )); } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java index a17c925..cb9919f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Optional; import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder; import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.junit.After; @@ -196,7 +197,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { try { - reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"), Optional.absent(), Optional.absent() ); + final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" ); + + reIndexService.rebuildIndex(builder ); reporter.report(); registry.remove( meterName ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java index 964e13d..78a1d4b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java @@ -42,16 +42,6 @@ public interface EdgesObservable { /** - * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified - * @param gm - * @param sourceNode - * @param edgeType The edge type if specified. Otherwise all types will be used - * @return - */ - Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, - final Optional<String> edgeType ); - - /** * Get all edges from the source node with the target type * @param gm * @param sourceNode @@ -67,4 +57,15 @@ public interface EdgesObservable { * @return */ Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode); + + /** + * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified + * @param gm + * @param sourceNode + * @param edgeType The edge type if specified. Otherwise all types will be used + * @param resume The edge to start seeking after. Otherwise starts at the most recent + * @return + */ + Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, + final Optional<String> edgeType, final Optional<Edge> resume ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java index 7240798..18274ac 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java @@ -72,7 +72,7 @@ public class EdgesObservableImpl implements EdgesObservable { @Override public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, - final Optional<String> edgeTypeInput ) { + final Optional<String> edgeTypeInput, final Optional<Edge> resume ) { @@ -86,7 +86,7 @@ public class EdgesObservableImpl implements EdgesObservable { return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - Optional.<Edge>absent() ) ); + resume ) ); } ); }