Refactored observable methods to correct name Added timestamp to message so that consumers can filter values for faster processing
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cb179d35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cb179d35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cb179d35 Branch: refs/heads/USERGRID-641 Commit: cb179d3512952203d20c00773789df23e27f147d Parents: b1d9ac2 Author: Todd Nine <[email protected]> Authored: Tue May 12 17:40:20 2015 -0700 Committer: Todd Nine <[email protected]> Committed: Tue May 12 17:40:20 2015 -0700 ---------------------------------------------------------------------- .../asyncevents/EventBuilder.java | 6 +- .../asyncevents/EventBuilderImpl.java | 18 ++- .../asyncevents/InMemoryAsyncEventService.java | 5 +- .../asyncevents/SQSAsyncEventService.java | 9 +- .../index/EntityIndexOperation.java | 46 +++++++ .../index/IndexServiceRequestBuilder.java | 88 +++++++++++++ .../index/IndexServiceRequestBuilderImpl.java | 130 +++++++++++++++++++ .../corepersistence/index/ReIndexAction.java | 2 +- .../corepersistence/index/ReIndexService.java | 13 +- .../index/ReIndexServiceImpl.java | 44 ++++--- .../rx/impl/AllEntityIdsObservable.java | 3 +- .../rx/impl/AllEntityIdsObservableImpl.java | 5 +- .../util/SerializableMapper.java | 91 ------------- .../rx/EdgesToTargetObservableIT.java | 4 +- .../graph/serialization/EdgesObservable.java | 7 +- .../serialization/impl/EdgesObservableImpl.java | 8 +- .../impl/TargetIdObservableImpl.java | 2 +- .../impl/migration/EdgeDataMigrationImpl.java | 2 +- 18 files changed, 342 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index f48451c..f9f157e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -22,8 +22,8 @@ package org.apache.usergrid.corepersistence.asyncevents; import java.util.List; +import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; @@ -72,10 +72,10 @@ public interface EventBuilder { /** * Re-index an entity in the scope provided - * @param entityIdScope + * @param entityIndexOperation * @return */ - Observable<IndexOperationMessage> index( EntityIdScope entityIdScope ); + Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation ); /** * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index c0d82d2..d35ed6d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -25,12 +25,13 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; @@ -38,6 +39,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.field.Field; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -140,14 +142,20 @@ public class EventBuilderImpl implements EventBuilder { @Override - public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) { + public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) { - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope(); - final Id entityId = entityIdScope.getId(); + final Id entityId = entityIndexOperation.getId(); //load the entity - return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ) + return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter( + entity -> { + final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED ); + + //only re-index if it has been updated and been updated after our timestamp + return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince(); + } ) //perform indexing on the task scheduler and start it .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index 6faa695..96966bf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -97,10 +98,10 @@ public class InMemoryAsyncEventService implements AsyncEventService { @Override - public void index( final EntityIdScope entityIdScope ) { + public void index( final EntityIndexOperation entityIndexOperation ) { - run(eventBuilder.index( entityIdScope )); + run(eventBuilder.index( entityIndexOperation )); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java index 415e5e8..1dbfd4e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.exception.NotImplementedException; @@ -184,7 +185,7 @@ public class SQSAsyncEventService implements AsyncEventService { } - @Override +// @Override public void index( final EntityIdScope entityIdScope ) { //queue the re-inex operation offer( entityIdScope ); @@ -346,4 +347,10 @@ public class SQSAsyncEventService implements AsyncEventService { subscriptions.add( subscription ); } } + + + @Override + public void index( final EntityIndexOperation entityIdScope ) { + throw new NotImplementedException( "Implement me" ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java new file mode 100644 index 0000000..3548bbe --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * The operation for re-indexing an entity. The entity should be updated + * with an updated timestamp > updatedSince. + */ +public class EntityIndexOperation extends EntityIdScope { + + private final long updatedSince; + + + public EntityIndexOperation( final ApplicationScope applicationScope, final Id id, final long updatedSince ) { + super( applicationScope, id ); + this.updatedSince = updatedSince; + } + + + public long getUpdatedSince() { + return updatedSince; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java new file mode 100644 index 0000000..07160d8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.elasticsearch.action.index.IndexRequestBuilder; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +/** + * A builder interface to build our re-index request + */ +public interface IndexServiceRequestBuilder { + + /** + * Set the application id + */ + IndexServiceRequestBuilder withApplicationId( final UUID applicationId ); + + /** + * Set the collection name. If not set, every collection will be reindexed + * @param collectionName + * @return + */ + IndexServiceRequestBuilder withCollection( final String collectionName ); + + /** + * Set our cursor to resume processing + * @param cursor + * @return + */ + IndexServiceRequestBuilder withCursor(final String cursor); + + + /** + * Set the timestamp to re-index entities updated >= this timestamp + * @param timestamp + * @return + */ + IndexServiceRequestBuilder withStartTimestamp(final Long timestamp); + + + /** + * Get the application scope + * @return + */ + Optional<ApplicationScope> getApplicationScope(); + + /** + * Get the collection name + * @return + */ + Optional<String> getCollectionName(); + + /** + * Get the cursor + * @return + */ + Optional<String> getCursor(); + + /** + * Get the updated since timestamp + * @return + */ + Optional<Long> getUpdateTimestamp(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java new file mode 100644 index 0000000..3466674 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder { + + + /** + * + final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData(); + + final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + + //create an observable that loads each entity and indexes it, start it running with publish + final ConnectableObservable<EdgeScope> runningReIndex = + allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp ) + + //for each edge, create our scope and index on it + .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish(); + + + + //start our sampler and state persistence + //take a sample every sample interval to allow us to resume state with minimal loss + runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, + rxTaskScheduler.getAsyncIOScheduler() ) + .doOnNext( edge -> { + + final String serializedState = SerializableMapper.asString( edge ); + + mapManager.putString( newCursor, serializedState, INDEX_TTL ); + } ).subscribe(); + + + */ + + private Optional<UUID> withApplicationId; + private Optional<String> withCollectionName; + private Optional<String> cursor; + private Optional<Long> updateTimestamp; + + + /*** + * + * @param applicationId + * @return + */ + @Override + public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) { + this.withApplicationId = Optional.fromNullable(applicationId); + return this; + } + + + @Override + public IndexServiceRequestBuilder withCollection( final String collectionName ) { + this.withCollectionName = Optional.fromNullable( collectionName ); + return this; + } + + + @Override + public IndexServiceRequestBuilder withCursor( final String cursor ) { + this.cursor = Optional.fromNullable( cursor ); + return this; + } + + + @Override + public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) { + this.updateTimestamp = Optional.fromNullable(timestamp ); + return this; + } + + + @Override + public Optional<ApplicationScope> getApplicationScope() { + + if(this.withApplicationId.isPresent()){ + return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get())); + } + + return Optional.absent(); + } + + + @Override + public Optional<String> getCollectionName() { + return withCollectionName; + } + + + @Override + public Optional<String> getCursor() { + return cursor; + } + + + @Override + public Optional<Long> getUpdateTimestamp() { + return updateTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java index b878246..672b3c8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java @@ -33,5 +33,5 @@ public interface ReIndexAction { * Index this entity with the specified scope * @param entityIdScope */ - void index( final EntityIdScope entityIdScope ); + void index( final EntityIndexOperation entityIdScope ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java index e594ad3..b25eca5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -45,16 +45,17 @@ public interface ReIndexService { /** * Perform an index rebuild * - * @param appId The applicationId to re-index, or all applications if absent - * @param collection The collection name to re-index. Otherwise all collections in an app will be used. - * @param cursor An optional cursor to resume processing - * @param startTimestamp The time to start indexing from. All edges >= this time will be indexed. + * @param indexServiceRequestBuilder The builder to build the request * @return */ - IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor, - final Optional<Long> startTimestamp); + IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder); + /** + * Generate a build for the index + * @return + */ + IndexServiceRequestBuilder getBuilder(); /** * The response when requesting a re-index operation http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index bd1bff9..a2fa09a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.index; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; @@ -28,7 +27,6 @@ import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.corepersistence.util.SerializableMapper; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -46,13 +44,11 @@ import com.google.inject.Singleton; import rx.Observable; import rx.observables.ConnectableObservable; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope; - @Singleton public class ReIndexServiceImpl implements ReIndexService { - private static final MapScope RESUME_MAP_SCOPTE = + private static final MapScope RESUME_MAP_SCOPE = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" ); //Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway. @@ -78,31 +74,41 @@ public class ReIndexServiceImpl implements ReIndexService { this.rxTaskScheduler = rxTaskScheduler; this.indexService = indexService; - this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE ); + this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE ); } + + @Override - public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor, - final Optional<Long> startTimestamp ) { + public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) { - //load our last emitted Scope if a cursor is present - if ( cursor.isPresent() ) { + //load our last emitted Scope if a cursor is present + if ( indexServiceRequestBuilder.getCursor().isPresent() ) { throw new UnsupportedOperationException( "Build this" ); } - final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData(); + final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope(); + final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData(); + + + final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE ); + //create an observable that loads each entity and indexes it, start it running with publish final ConnectableObservable<EdgeScope> runningReIndex = - allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp ) + allEntityIdsObservable.getEdgesToEntities( applicationScopes, + indexServiceRequestBuilder.getCollectionName() ) //for each edge, create our scope and index on it - .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish(); + .doOnNext( edge -> indexService.index( + new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), + modifiedSince ) ) ).publish(); @@ -112,9 +118,9 @@ public class ReIndexServiceImpl implements ReIndexService { rxTaskScheduler.getAsyncIOScheduler() ) .doOnNext( edge -> { - final String serializedState = SerializableMapper.asString( edge ); - - mapManager.putString( newCursor, serializedState, INDEX_TTL ); +// final String serializedState = SerializableMapper.asString( edge ); +// +// mapManager.putString( newCursor, serializedState, INDEX_TTL ); } ).subscribe(); @@ -124,6 +130,12 @@ public class ReIndexServiceImpl implements ReIndexService { return new IndexResponse( newCursor, runningReIndex ); } + + + @Override + public IndexServiceRequestBuilder getBuilder() { + return new IndexServiceRequestBuilderImpl(); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java index b9e5373..aada240 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java @@ -44,9 +44,8 @@ public interface AllEntityIdsObservable { * Get all edges that represent edges to entities in the system * @param appScopes * @param edgeType The edge type to use (if specified) - * @param startTime The time to start with * @return */ - Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime); + Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java index 257fab1..6a95e7b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java @@ -81,12 +81,13 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable { @Override - public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime) { + public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) { return appScopes.flatMap( applicationScope -> { final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(),edgeType, startTime ).map( edge -> new EdgeScope(applicationScope, edge )); + return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType ) + .map( edge -> new EdgeScope(applicationScope, edge )); } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java deleted file mode 100644 index 19ecf6d..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.util; - - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; - -import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Preconditions; - - -/** - * A simple utility for serializing serializable classes to/and from strings. To be used for small object storage only, such as resume on re-index - * storing data such as entities should be specialized. - */ -public class SerializableMapper { - - private static final SmileFactory SMILE_FACTORY = new SmileFactory(); - - private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); - - static{ - MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); - SMILE_FACTORY.delegateToTextual( true ); - } - - /** - * Get value as a string - * @param toSerialize - * @param <T> - * @return - */ - public static <T extends Serializable> String asString(final T toSerialize){ - try { - return MAPPER.writeValueAsString( toSerialize ); - } - catch ( JsonProcessingException e ) { - throw new RuntimeException( "Unable to process json", e ); - } - } - - - /** - * Write the value as a string - * @param <T> - * @param serialized - * @param clazz - * @return - */ - public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){ - Preconditions.checkNotNull(serialized, "serialized string cannot be null"); - - - InputStream stream = new ByteArrayInputStream(serialized.getBytes( StandardCharsets.UTF_8)); - - try { - return MAPPER.readValue( stream, clazz ); - } - catch ( IOException e ) { - throw new RuntimeException( String.format("Unable to parse string '%s'", serialized), e ); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java index 92f2b01..9e84219 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java @@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { final GraphManager gm = managerCache.getGraphManager( scope ); - edgesFromSourceObservable.edgesFromSourceAscending( gm, applicationId ).doOnNext( edge -> { + edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> { final String edgeType = edge.getType(); final Id target = edge.getTargetNode(); @@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { //test connections - edgesFromSourceObservable.edgesFromSourceAscending( gm, source ).doOnNext( edge -> { + edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> { final String edgeType = edge.getType(); final Id target = edge.getTargetNode(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java index 9f0bd60..964e13d 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java @@ -38,7 +38,7 @@ public interface EdgesObservable { * @param sourceNode * @return */ - Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ); + Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ); /** @@ -46,11 +46,10 @@ public interface EdgesObservable { * @param gm * @param sourceNode * @param edgeType The edge type if specified. Otherwise all types will be used - * @param startTimestamp The start timestamp if specfiied, otherwise Long.MIN will be used * @return */ - Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode,final Optional<String> edgeType, - final Optional<Long> startTimestamp ); + Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, + final Optional<String> edgeType ); /** * Get all edges from the source node with the target type http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java index df9e094..7240798 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java @@ -55,7 +55,7 @@ public class EdgesObservableImpl implements EdgesObservable { * Get all edges from the source */ @Override - public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ) { + public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) { final Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); @@ -71,8 +71,8 @@ public class EdgesObservableImpl implements EdgesObservable { @Override - public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, final Optional<String> edgeTypeInput, - final Optional<Long> startTimestamp ) { + public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode, + final Optional<String> edgeTypeInput ) { @@ -85,7 +85,7 @@ public class EdgesObservableImpl implements EdgesObservable { logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode ); return gm.loadEdgesFromSource( - new SimpleSearchByEdgeType( sourceNode, edgeType, startTimestamp.or( Long.MIN_VALUE ), SearchByEdgeType.Order.ASCENDING, + new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java index 5cf5117..82c7d54 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java @@ -55,7 +55,7 @@ public class TargetIdObservableImpl implements TargetIdObservable { public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) { //only search edge types that start with collections - return edgesFromSourceObservable.edgesFromSourceAscending( gm, sourceNode ).map( new Func1<Edge, Id>() { + return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( new Func1<Edge, Id>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java index 0df26ff..d6c42e3 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java @@ -87,7 +87,7 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> { final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope ); //get edges from the source - return edgesFromSourceObservable.edgesFromSourceAscending( gm, graphNode.entryNode ).buffer( 1000 ) + return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 ) .doOnNext( edges -> { final MutationBatch batch = keyspace.prepareMutationBatch();
