Repository: usergrid Updated Branches: refs/heads/release-2.1.1 a46224419 -> 10ff27cc8
Make the intial write path of indexing require less I/O. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10ff27cc Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10ff27cc Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10ff27cc Branch: refs/heads/release-2.1.1 Commit: 10ff27cc86923a24c52e881f821870867dd95212 Parents: a462244 Author: Michael Russo <[email protected]> Authored: Wed Apr 13 22:42:12 2016 +0200 Committer: Michael Russo <[email protected]> Committed: Wed Apr 13 22:42:12 2016 +0200 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 102 ++++++++++++++----- .../asyncevents/model/AsyncEvent.java | 2 + .../asyncevents/model/EdgeIndexEvent.java | 70 +++++++++++++ .../asyncevents/model/EntityIndexEvent.java | 54 ++++++++++ 4 files changed, 200 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 82ad5be..3b01292 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -310,20 +310,33 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.debug("Processing event with type {}", event.getClass().getSimpleName()); } - IndexOperationMessage indexOperationMessage = null; try { + IndexOperationMessage single = new IndexOperationMessage(); + + // normal indexing event for an entity + if ( event instanceof EntityIndexEvent ){ + + single = handleEntityIndexUpdate( message ); + + } + // normal indexing event for an edge + else if ( event instanceof EdgeIndexEvent ){ + + single = handleEdgeIndex( message ); + + } // deletes are 2-part, actual IO to delete data, then queue up a de-index - if ( event instanceof EdgeDeleteEvent ) { + else if ( event instanceof EdgeDeleteEvent ) { - handleEdgeDelete( message ); + single = handleEdgeDelete( message ); } // deletes are 2-part, actual IO to delete data, then queue up a de-index else if ( event instanceof EntityDeleteEvent ) { - handleEntityDelete( message ); + single = handleEntityDelete( message ); } - // application initialization has special logic, therefore a special event type + // initialization has special logic, therefore a special event type and no index operation message else if ( event instanceof InitializeApplicationIndexEvent ) { handleInitializeApplicationIndex(event, message); @@ -331,11 +344,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { // this is the main event that pulls the index doc from map persistence and hands to the index producer else if (event instanceof ElasticsearchIndexEvent) { - indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event); + handleIndexOperation((ElasticsearchIndexEvent) event); } else if (event instanceof DeIndexOldVersionsEvent) { - indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event); + single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event); } else { @@ -343,9 +356,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - // returning indexOperationMessage will send that indexOperationMessage to the index producer // if no exception happens and the QueueMessage is returned in these results, it will get ack'd - return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime()); + return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime()); } catch (IndexDocNotFoundException e){ @@ -382,6 +394,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, long updatedAfter) { + offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0)); + final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter); @@ -392,19 +406,56 @@ public class AsyncEventServiceImpl implements AsyncEventService { } + private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) { + + Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); + + final AsyncEvent event = ( AsyncEvent ) message.getBody(); + + Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); + Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); + + final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event; + + + //process the entity immediately + //only process the same version, otherwise ignore + final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope(); + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + final Id entityId = entityIdScope.getId(); + final long updatedAfter = entityIndexEvent.getUpdatedAfter(); + + final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); + + return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); + } + @Override public void queueNewEdge(final ApplicationScope applicationScope, final Entity entity, final Edge newEdge) { - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge )); - final IndexOperationMessage indexMessage = ecm.load( entity.getId() ) - .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) ) - .toBlocking().lastOrDefault(null); + } - queueIndexOperationMessage( indexMessage ); + private IndexOperationMessage handleEdgeIndex(final QueueMessage message) { + + Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" ); + + final AsyncEvent event = (AsyncEvent) message.getBody(); + + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" ); + Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); + + final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() ); + + return ecm.load( edgeIndexEvent.getEntityId() ) + .flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) ) + .toBlocking().lastOrDefault(null); } @@ -417,7 +468,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); } - public void handleEdgeDelete(final QueueMessage message) { + private IndexOperationMessage handleEdgeDelete(final QueueMessage message) { Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); @@ -436,10 +487,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); } - IndexOperationMessage indexMessage = - eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null); - - queueIndexOperationMessage(indexMessage); + return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null); } @@ -478,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offerTopic( elasticsearchIndexEvent ); } - public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ + private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException { Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); @@ -529,7 +577,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { // always do a check to ensure the indexes are initialized for the index requests initializeEntityIndexes(indexOperationMessage); - return indexOperationMessage; + // send it to to be indexed + indexProducer.put(indexOperationMessage).toBlocking().last(); } @@ -599,7 +648,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } - public void handleEntityDelete(final QueueMessage message) { + private IndexOperationMessage handleEntityDelete(final QueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); @@ -625,14 +674,12 @@ public class AsyncEventServiceImpl implements AsyncEventService { entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); - IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null); - - queueIndexOperationMessage(indexMessage); + return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null); } - public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { + private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); @@ -793,8 +840,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // collect into a list of QueueMessages that can be ack'd later .collect(Collectors.toList()); - // sumbit the requests to Elasticsearch - indexProducer.put(combined).toBlocking().last(); + queueIndexOperationMessage(combined); return queueMessages; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 7b2b228..bd581ad 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -38,6 +38,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonIgnoreProperties( ignoreUnknown = true ) @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" ) @JsonSubTypes( { + @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ), + @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ), @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ), @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java new file mode 100644 index 0000000..6164dce --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents.model; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.model.entity.Id; + + +public final class EdgeIndexEvent + extends AsyncEvent { + + + @JsonProperty + protected ApplicationScope applicationScope; + + @JsonProperty + protected Id entityId; + + @JsonProperty + protected Edge edge; + + /** + * Needed by jackson + */ + public EdgeIndexEvent() { + super(); + } + + public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) { + super(sourceRegion); + this.applicationScope = applicationScope; + this.entityId = entityId; + this.edge = edge; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + + public Edge getEdge() { + return edge; + } + + + public Id getEntityId() { + return entityId; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java new file mode 100644 index 0000000..7e8184b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; + + +public final class EntityIndexEvent extends AsyncEvent { + + + @JsonProperty + protected EntityIdScope entityIdScope; + + @JsonProperty + private long updatedAfter; + + public EntityIndexEvent() { + super(); + } + + public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) { + super(sourceRegion); + this.entityIdScope = entityIdScope; + this.updatedAfter = updatedAfter; + } + + + public long getUpdatedAfter() { + return updatedAfter; + } + + + public EntityIdScope getEntityIdScope() { + return entityIdScope; + } +}
