Revert "Add ability to start initial re-index seek with the UNIX timestamp. This will only start seeking from the time provided, rather than seeking all and discarding what doesn't match a filter."
This reverts commit 85cc93436a163c3ba21a7ac1286c6bce3daebeb4. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5db402d5 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5db402d5 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5db402d5 Branch: refs/heads/asf-site Commit: 5db402d53e40b99901b2a97894cbdd77e60881b3 Parents: 85cc934 Author: Michael Russo <[email protected]> Authored: Mon Oct 3 21:46:46 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Mon Oct 3 21:46:46 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/index/ReIndexServiceImpl.java | 18 ++++-------------- .../rx/impl/AllEntityIdsObservable.java | 6 +----- .../rx/impl/AllEntityIdsObservableImpl.java | 7 ++----- .../graph/serialization/EdgesObservable.java | 4 +--- .../serialization/impl/EdgesObservableImpl.java | 16 +++------------- .../usergrid/rest/system/IndexResource.java | 12 ++++++------ 6 files changed, 17 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index f37f9af..19fbcfa 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -135,17 +135,7 @@ public class ReIndexServiceImpl implements ReIndexService { final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); - final long startTimestamp; - if ( reIndexRequestBuilder.getUpdateTimestamp().isPresent() && reIndexRequestBuilder.getUpdateTimestamp().get() > 0 ){ - - // edge timestamps are UUID timestamps, we need to convert from UNIX epoch to a UUID timestamp - long uuidEpochNanos = 0x01b21dd213814000L; // num 100 nano seconds since uuid epoch - startTimestamp = reIndexRequestBuilder.getUpdateTimestamp().get()*10000 + uuidEpochNanos; - logger.info("Reindex provided with from timestamp, converted to an Edge timestamp is: {}", startTimestamp); - }else{ - startTimestamp = 0; - } - + final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE ); // create an observable that loads a batch to be indexed @@ -175,11 +165,11 @@ public class ReIndexServiceImpl implements ReIndexService { } allEntityIdsObservable.getEdgesToEntities( applicationScopes, - reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue(), startTimestamp ) + reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() ) .buffer( indexProcessorFig.getReindexBufferSize()) .doOnNext( edgeScopes -> { logger.info("Sending batch of {} to be indexed.", edgeScopes.size()); - indexService.indexBatch(edgeScopes, startTimestamp); + indexService.indexBatch(edgeScopes, modifiedSince); count.addAndGet(edgeScopes.size() ); if( edgeScopes.size() > 0 ) { writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); @@ -188,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService { .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() )) .subscribeOn( Schedulers.io() ).subscribe(); - + return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java index fe7a455..9070609 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java @@ -46,12 +46,8 @@ public interface AllEntityIdsObservable { * @param appScopes * @param edgeType The edge type to use (if specified) * @param lastEdge The edge to resume processing from - * @param startTimestamp An optional unix timestamp to start the seek ( it will be converted to an Edge ) * @return */ - Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, - final Optional<String> edgeType, - Optional<Edge> lastEdge, - final long startTimestamp ); + Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java index e6f3633..0420a32 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java @@ -82,15 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable { @Override - public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, - final Optional<String> edgeType, - final Optional<Edge> lastEdge, - final long startTimestamp ) { + public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) { return appScopes.flatMap( applicationScope -> { final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge, startTimestamp ) + return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge ) .map( edge -> new EdgeScope(applicationScope, edge )); } ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java index 7c83207..78a1d4b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java @@ -64,10 +64,8 @@ public interface EdgesObservable { * @param sourceNode * @param edgeType The edge type if specified. Otherwise all types will be used * @param resume The edge to start seeking after. Otherwise starts at the most recent - * @param startTimestamp A unix timestamp to start seeking from if you don't have the edge cursor * @return */ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, - final Optional<String> edgeType, final Optional<Edge> resume, - final long startTimestamp ); + final Optional<String> edgeType, final Optional<Edge> resume ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java index 2504e87..20efe42 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java @@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl; -import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,8 +74,8 @@ public class EdgesObservableImpl implements EdgesObservable { @Override public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, - final Optional<String> edgeTypeInput, final Optional<Edge> resume, - final long startTimestamp ) { + final Optional<String> edgeTypeInput, final Optional<Edge> resume ) { + final Observable<String> edgeTypes = edgeTypeInput.isPresent()? Observable.just( edgeTypeInput.get() ): @@ -85,22 +84,13 @@ public class EdgesObservableImpl implements EdgesObservable { return edgeTypes.flatMap( edgeType -> { - final Optional<Edge> start; - - if( !resume.isPresent() && startTimestamp > 0 ){ - // the target node doesn't matter here, the search only looks at the timestamp - start = Optional.of(new SimpleEdge(sourceNode, edgeType, sourceNode, startTimestamp)); - }else{ - start = resume; - } - if (logger.isTraceEnabled()) { logger.trace("Loading edges of edgeType {} from {}", edgeType, sourceNode); } return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - start ) ); + resume ) ); } ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java index 2be5b87..be60177 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java @@ -61,7 +61,7 @@ import java.util.UUID; public class IndexResource extends AbstractContextResource { private static final Logger logger = LoggerFactory.getLogger( IndexResource.class ); - private static final String SINCE_FIELD = "since"; + private static final String UPDATED_FIELD = "updated"; @@ -321,17 +321,17 @@ public class IndexResource extends AbstractContextResource { final String callback ) { Map<String,Object> newPayload = payload; - if(newPayload == null || !payload.containsKey(SINCE_FIELD)){ + if(newPayload == null || !payload.containsKey( UPDATED_FIELD )){ newPayload = new HashMap<>(1); - newPayload.put(SINCE_FIELD,0); + newPayload.put(UPDATED_FIELD,0); } - Preconditions.checkArgument(newPayload.get(SINCE_FIELD) instanceof Number, + Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number, "You must specified the field \"updated\" in the payload and it must be a timestamp" ); //add our updated timestamp to the request - if ( newPayload.containsKey(SINCE_FIELD) ) { - final long timestamp = ConversionUtils.getLong(newPayload.get(SINCE_FIELD)); + if ( newPayload.containsKey( UPDATED_FIELD ) ) { + final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD)); request.withStartTimestamp( timestamp ); }
