Ensure indexBatch works with the new model for indexing.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3c399e79 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3c399e79 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3c399e79 Branch: refs/heads/master Commit: 3c399e790e16609bdc4a76853fcbc5ad562e8979 Parents: b4634dc Author: Michael Russo <[email protected]> Authored: Fri Feb 19 23:06:30 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Fri Feb 19 23:06:30 2016 -0800 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 68 +++++++++++++------ .../asyncevents/model/AsyncEvent.java | 2 - .../asyncevents/model/EdgeIndexEvent.java | 70 -------------------- .../asyncevents/model/EntityIndexEvent.java | 54 --------------- 4 files changed, 49 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index e101761..dac3651 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -36,7 +36,6 @@ import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; @@ -298,12 +297,12 @@ public class AsyncEventServiceImpl implements AsyncEventService { } catch (ClassCastException cce) { logger.error("Failed to deserialize message body", cce); - return new IndexEventResult(Optional.absent(), System.currentTimeMillis()); + return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis()); } if (event == null) { logger.error("AsyncEvent type or event is null!"); - return new IndexEventResult(Optional.absent(), System.currentTimeMillis()); + return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis()); } final AsyncEvent thisEvent = event; @@ -312,6 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.debug("Processing {} event", event); } + IndexOperationMessage indexOperationMessage = null; try { // deletes are 2-part, actual IO to delete data, then queue up a de-index @@ -332,7 +332,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // this is the main event that pulls the index doc from map persistence and hands to the index producer else if (event instanceof ElasticsearchIndexEvent) { - handleIndexOperation((ElasticsearchIndexEvent) event); + indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event); } else { @@ -341,20 +341,20 @@ public class AsyncEventServiceImpl implements AsyncEventService { //return type that can be indexed and ack'd later - return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime()); + return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime()); } catch (IndexDocNotFoundException e){ // this exception is throw when we wait before trying quorum read on map persistence. // return empty event result so the event's message doesn't get ack'd logger.info(e.getMessage()); - return new IndexEventResult(Optional.absent(), event.getCreationTime()); + return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime()); } catch (Exception e) { // if the event fails to process, log the message and return empty event result so it doesn't get ack'd logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e); - return new IndexEventResult(Optional.absent(), event.getCreationTime()); + return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime()); } }); @@ -407,6 +407,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge) { + // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); } @@ -471,7 +472,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offerTopic( elasticsearchIndexEvent ); } - public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ + public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); @@ -525,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { //now execute it - indexProducer.put(indexOperationMessage).toBlocking().last(); + return indexOperationMessage; } @@ -568,6 +569,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { + // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } @@ -699,7 +701,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { try { List<IndexEventResult> indexEventResults = callEventHandlers( messages ); - List<QueueMessage> messagesToAck = ackMessages( indexEventResults ); + List<QueueMessage> messagesToAck = submitToIndex( indexEventResults ); if ( messagesToAck == null || messagesToAck.size() == 0 ) { logger.error( @@ -738,17 +740,21 @@ public class AsyncEventServiceImpl implements AsyncEventService { * @param indexEventResults * @return */ - private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) { + private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { //if nothing came back then return null if(indexEventResults==null){ return null; } + IndexOperationMessage combined = new IndexOperationMessage(); // stream the messages to record the cycle time - return indexEventResults.stream() + List<QueueMessage> queueMessages = indexEventResults.stream() .map(indexEventResult -> { //record the cycle time messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()); + if(indexEventResult.getIndexOperationMessage().isPresent()){ + combined.ingest(indexEventResult.getIndexOperationMessage().get()); + } return indexEventResult; }) // filter out messages that are not present, they were not processed and put into the results @@ -756,34 +762,58 @@ public class AsyncEventServiceImpl implements AsyncEventService { .map(result -> result.getQueueMessage().get()) // collect .collect(Collectors.toList()); + + // sumbit the requests to Elasticsearch + indexProducer.put(combined).toBlocking().last(); + + return queueMessages; } public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { - //change to id scope to avoid serialization issues - offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) ); + + EntityIndexOperation entityIndexOperation = + new EntityIndexOperation( applicationScope, id, updatedSince); + + queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null)); } public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { - List batch = new ArrayList<EdgeScope>(); + IndexOperationMessage batch = new IndexOperationMessage(); + for ( EdgeScope e : edges){ - //change to id scope to avoid serialization issues - batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); + + EntityIndexOperation entityIndexOperation = + new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince); + + IndexOperationMessage indexOperationMessage = + eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); + + if (indexOperationMessage != null){ + batch.ingest(indexOperationMessage); + } + } - offerBatch( batch ); + + queueIndexOperationMessage(batch); } public class IndexEventResult{ + private final Optional<IndexOperationMessage> indexOperationMessage; private final Optional<QueueMessage> queueMessage; private final long creationTime; - public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){ + public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){ this.queueMessage = queueMessage; this.creationTime = creationTime; + this.indexOperationMessage = indexOperationMessage; } + public Optional<IndexOperationMessage> getIndexOperationMessage() { + return indexOperationMessage; + } public Optional<QueueMessage> getQueueMessage() { return queueMessage; http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 5f86410..57b5812 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -40,9 +40,7 @@ import org.apache.usergrid.persistence.queue.QueueFig; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" ) @JsonSubTypes( { @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ), - @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ), @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), - @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ), @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ) } ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java deleted file mode 100644 index 6164dce..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.asyncevents.model; - - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.model.entity.Id; - - -public final class EdgeIndexEvent - extends AsyncEvent { - - - @JsonProperty - protected ApplicationScope applicationScope; - - @JsonProperty - protected Id entityId; - - @JsonProperty - protected Edge edge; - - /** - * Needed by jackson - */ - public EdgeIndexEvent() { - super(); - } - - public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) { - super(sourceRegion); - this.applicationScope = applicationScope; - this.entityId = entityId; - this.edge = edge; - } - - - public ApplicationScope getApplicationScope() { - return applicationScope; - } - - - public Edge getEdge() { - return edge; - } - - - public Id getEntityId() { - return entityId; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java deleted file mode 100644 index 7e8184b..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.asyncevents.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; - - -public final class EntityIndexEvent extends AsyncEvent { - - - @JsonProperty - protected EntityIdScope entityIdScope; - - @JsonProperty - private long updatedAfter; - - public EntityIndexEvent() { - super(); - } - - public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) { - super(sourceRegion); - this.entityIdScope = entityIdScope; - this.updatedAfter = updatedAfter; - } - - - public long getUpdatedAfter() { - return updatedAfter; - } - - - public EntityIdScope getEntityIdScope() { - return entityIdScope; - } -}
