Add option to include old version in result Add debug options Add gzip support
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/000eaaad Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/000eaaad Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/000eaaad Branch: refs/heads/master Commit: 000eaaadfd83ace702d99fb740b6b0129688d9f7 Parents: 7139781 Author: Peter Johnson <[email protected]> Authored: Tue Nov 7 12:29:50 2017 -0800 Committer: Peter Johnson <[email protected]> Committed: Mon Nov 13 21:37:43 2017 -0800 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 15 +- .../corepersistence/CpRelationManager.java | 26 +-- .../asyncevents/AsyncEventService.java | 6 +- .../asyncevents/AsyncEventServiceImpl.java | 27 ++- .../asyncevents/direct/BufferedQueueImpl.java | 191 ------------------- .../asyncevents/direct/BufferedQueueNOP.java | 3 + .../direct/DirectFirstEventServiceImpl.java | 54 +++--- .../corepersistence/index/IndexingStrategy.java | 69 ------- .../corepersistence/pipeline/Pipeline.java | 9 +- .../pipeline/PipelineContext.java | 21 +- .../pipeline/builder/CandidateBuilder.java | 13 +- .../pipeline/builder/IdBuilder.java | 10 +- .../pipeline/builder/PipelineBuilder.java | 19 +- .../read/search/CandidateEntityFilter.java | 66 +++++-- .../service/CollectionSearch.java | 9 + .../service/CollectionServiceImpl.java | 12 +- .../corepersistence/util/CpCollectionUtils.java | 49 +++-- .../persistence/CollectionDeleteTest.java | 2 +- .../usergrid/persistence/index/EntityIndex.java | 1 - .../index/impl/EsEntityIndexImpl.java | 2 +- .../persistence/index/impl/EntityIndexTest.java | 1 + .../persistence/queue/LegacyQueueFig.java | 9 +- .../queue/impl/SNSQueueManagerImpl.java | 10 +- .../queue/settings/IndexConsistency.java | 64 +++++++ .../queue/settings/QueueIndexingStrategy.java | 80 ++++++++ .../rest/interceptors/GZIPInterceptor.java | 79 ++++++++ .../interceptors/GZIPWriterInterceptor.java | 78 -------- .../collection/CollectionsResourceIT.java | 151 +++++++++++++++ 28 files changed, 629 insertions(+), 447 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 1c979d6..7a4c781 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -32,7 +32,6 @@ import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; -import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpCollectionUtils; @@ -70,6 +69,7 @@ import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.StringField; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.mq.Message; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import org.apache.usergrid.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -524,7 +524,7 @@ public class CpEntityManager implements EntityManager { String entityType = cpEntity.getId().getType(); boolean skipIndexingForType = skipIndexingForType(entityType); - IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); + QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType); try { @@ -552,14 +552,14 @@ public class CpEntityManager implements EntityManager { } if (!skipIndexingForType) { - indexEntity(cpEntity, indexingStrategy); + indexEntity(cpEntity, queueIndexingStrategy); deIndexOldVersionsOfEntity(cpEntity); } } - private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, IndexingStrategy indexingStrategy) { + private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, QueueIndexingStrategy queueIndexingStrategy) { // queue an event to update the new entity - indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , indexingStrategy); + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , queueIndexingStrategy); } private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) { @@ -569,7 +569,7 @@ public class CpEntityManager implements EntityManager { } } - private IndexingStrategy getIndexingStrategyForType(String type ) { + private QueueIndexingStrategy getIndexingStrategyForType(String type ) { return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type); } @@ -1813,7 +1813,8 @@ public class CpEntityManager implements EntityManager { for (String validName : CpCollectionUtils.getValidSettings()) { if (newSettings.containsKey(validName)) { - updatedSettings.put(validName, newSettings.get(validName)); + Object value = CpCollectionUtils.validateValue(validName, newSettings.get(validName)); + updatedSettings.put(validName, value); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index a23d6ac..e329c29 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -20,10 +20,7 @@ package org.apache.usergrid.corepersistence; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; -import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; -import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; @@ -50,6 +47,8 @@ import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.queue.settings.IndexConsistency; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import org.apache.usergrid.persistence.schema.CollectionInfo; import org.apache.usergrid.utils.InflectionUtils; import org.apache.usergrid.utils.MapUtils; @@ -62,7 +61,6 @@ import java.util.*; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*; import static org.apache.usergrid.persistence.Schema.*; -import static org.apache.usergrid.utils.ClassUtils.cast; import static org.apache.usergrid.utils.InflectionUtils.singularize; import static org.apache.usergrid.utils.MapUtils.addMapSet; @@ -397,8 +395,8 @@ public class CpRelationManager implements RelationManager { String entityType = cpHeadEntity.getId().getType(); if ( !skipIndexingForType( entityType) ) { - IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); - indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, indexingStrategy); + QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType); + indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, queueIndexingStrategy); } } ); @@ -406,8 +404,8 @@ public class CpRelationManager implements RelationManager { String entityType = memberEntity.getId().getType(); if ( !skipIndexingForType( entityType ) ) { - IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); - indexService.queueNewEdge(applicationScope, memberEntityId, edge, indexingStrategy); + QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType); + indexService.queueNewEdge(applicationScope, memberEntityId, edge, queueIndexingStrategy); } @@ -668,6 +666,8 @@ public class CpRelationManager implements RelationManager { queryString, cursor ); search.setAnalyzeOnly(analyzeOnly); + IndexConsistency indexConsistency = getIndexConsistencyForType(collectionName); + search.setKeepStaleEntries(indexConsistency == IndexConsistency.LATEST); return collectionService.searchCollection( search ); } @@ -738,8 +738,8 @@ public class CpRelationManager implements RelationManager { String entityType = targetEntity.getId().getType(); if ( !skipIndexingForType( entityType ) ) { - IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); - indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, indexingStrategy); + QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType); + indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, queueIndexingStrategy); } // remove any duplicate edges (keeps the duplicate edge with same timestamp) @@ -1100,7 +1100,11 @@ public class CpRelationManager implements RelationManager { } - private IndexingStrategy getIndexingStrategyForType(String type ) { + private IndexConsistency getIndexConsistencyForType(String type ) { + return CpCollectionUtils.getIndexConsistencyForType(collectionSettingsFactory, applicationId, type); + } + + private QueueIndexingStrategy getIndexingStrategyForType(String type ) { return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 4305aea..b8e8117 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -21,13 +21,13 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.corepersistence.index.CollectionDeleteAction; -import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import java.util.UUID; @@ -55,7 +55,7 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction * @param updatedAfter * @param */ - void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, IndexingStrategy strategy); + void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, QueueIndexingStrategy strategy); /** @@ -68,7 +68,7 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction * @param entityId * @param newEdge */ - void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, IndexingStrategy indexingStrategy); + void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, QueueIndexingStrategy queueIndexingStrategy); /** * Queue the deletion of an edge http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 8257640..ec08dfe 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -29,7 +29,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; -import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.asyncevents.model.*; import org.apache.usergrid.corepersistence.index.*; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; @@ -58,6 +57,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -214,7 +214,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { start(); } - protected Histogram getMessageCycye() { + protected Histogram getMessageCycle() { return messageCycle; } @@ -278,28 +278,25 @@ public class AsyncEventServiceImpl implements AsyncEventService { * Offer the EntityIdScope to SQS */ protected void offer(final Serializable operation) { - offer(operation, AsyncEventQueueType.REGULAR, IndexingStrategy.DIRECT); + offer(operation, AsyncEventQueueType.REGULAR, QueueIndexingStrategy.DIRECT); } /** * Offer the EntityIdScope to SQS */ - protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) { - offer(operation, AsyncEventQueueType.REGULAR, indexingStrategy); + protected void offer(final Serializable operation, QueueIndexingStrategy queueIndexingStrategy) { + offer(operation, AsyncEventQueueType.REGULAR, queueIndexingStrategy); } /** * Offer the EntityIdScope to SQS */ - private void offer(final Serializable operation, AsyncEventQueueType queueType, IndexingStrategy indexingStrategy) { + private void offer(final Serializable operation, AsyncEventQueueType queueType, QueueIndexingStrategy queueIndexingStrategy) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - Boolean async = null; - if (indexingStrategy != IndexingStrategy.DEFAULT) { - async = (indexingStrategy == IndexingStrategy.ASYNC); - } + Boolean async = (queueIndexingStrategy == QueueIndexingStrategy.ASYNC); getQueue(queueType).sendMessageToLocalRegion(operation, async); } catch (IOException e) { @@ -548,7 +545,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityIndexUpdate(final ApplicationScope applicationScope, - final Entity entity, long updatedAfter, IndexingStrategy indexingStrategy) { + final Entity entity, long updatedAfter, QueueIndexingStrategy queueIndexingStrategy) { if (logger.isTraceEnabled()) { @@ -561,7 +558,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { new EntityIdScope(applicationScope, entity.getId()), updatedAfter); - offer(event, indexingStrategy); + offer(event, queueIndexingStrategy); } @@ -599,14 +596,14 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, - IndexingStrategy indexingStrategy) { + QueueIndexingStrategy queueIndexingStrategy) { if (logger.isTraceEnabled()) { logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", newEdge.getType(), entityId.getUuid(), entityId.getType()); } - offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), indexingStrategy); + offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), queueIndexingStrategy); } @@ -710,7 +707,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offerTopic( elasticsearchIndexEvent, queueType ); } - protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) { + protected ElasticsearchIndexEvent getESIndexEvent(final IndexOperationMessage indexOperationMessage) { final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java deleted file mode 100644 index 9123138..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.corepersistence.asyncevents.direct; - - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; -import java.util.function.Consumer; - -/** - * Bufferes events and dispatched then in batches. - * Ensures that the callback will be called at a min interval. - */ -public class BufferedQueueImpl<T> implements BufferedQueue<T> { - - private String fileName = "my_file_name.txt"; - private Consumer<List<T>> consumer; - - ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1); - - private final LinkedBlockingQueue<PendingDispatch> queue; - private final long intervalNanos; - private long timeOfLastDispatch = 0L; - - public BufferedQueueImpl(int size, long interval , TimeUnit intervalTimeUnit) { - - Runtime.getRuntime().addShutdownHook(new Thread(new DispatchTask())); - - this.intervalNanos = intervalTimeUnit.toNanos(interval); - threadPool.scheduleAtFixedRate(new DispatchTask(), intervalNanos,intervalNanos, TimeUnit.NANOSECONDS); - readBatchFile(); - queue = new LinkedBlockingQueue<>(size); - } - - public boolean offer(T t) { - PendingDispatch pd = new PendingDispatch(t); - if (timeOfLastDispatch + intervalNanos < System.nanoTime()) { - dispatchOne(pd); - return true; - } - try { - return queue.offer(pd, intervalNanos, TimeUnit.NANOSECONDS); - } catch (InterruptedException e) { - return false; - } - } - - public void setConsumer(Consumer<List<T>> consumer) { - this.consumer = consumer; - } - - - private void dispatchOne(PendingDispatch pd) { - List<PendingDispatch> messages = new ArrayList<>(); - messages.add(pd); - dispatchMessages(messages); - } - - protected void dispatchAll() { - if (!queue.isEmpty()) { - List<PendingDispatch> messages = new ArrayList<>(); - queue.drainTo(messages); - dispatchMessages(messages); - } - } - - private void dispatchMessages(List<PendingDispatch> messages) { - List<T> m = new ArrayList<>(); - for (PendingDispatch pd : messages) { - if (!pd.isCancelled()) { - m.add(pd.getWrapped()); - } - } - timeOfLastDispatch = System.nanoTime(); - Boolean sent = Boolean.TRUE; - try { - consumer.accept(m); - } catch (Exception e) { - sent = Boolean.FALSE; - } - for (PendingDispatch pd : messages) { - pd.setResult(sent); - } - } - - - public int size() { - return queue.size(); - } - - private void readBatchFile() { - - } - - - // - // Internal Helper classes - // - - - - private class PendingDispatch implements Future<Boolean> { - T wrapped; - boolean canceled; - boolean done; - Boolean result = null; - - PendingDispatch(T wrapped) { - this.wrapped = wrapped; - canceled = false; - done = false; - } - - T getWrapped() { - return wrapped; - } - - void setResult(Boolean b) { - result = b; - done = true; - synchronized (this) { - notify(); - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - canceled = true; - return canceled; - } - - @Override - public boolean isCancelled() { - return canceled; - } - - @Override - public boolean isDone() { - return done; - } - - @Override - public Boolean get() throws InterruptedException, ExecutionException { - while (!done) { - synchronized (this) { - wait(100); - } - } - return result; - } - - @Override - public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (!done) { - synchronized (this) { - wait(unit.toMillis(timeout)); - } - } - return result; - } - } - - - private class DispatchTask implements Runnable { - @Override - public void run() { - try { - dispatchAll(); - } catch (Throwable t) { - } - } - } - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java index f842cea..c4d28b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java @@ -21,6 +21,9 @@ package org.apache.usergrid.corepersistence.asyncevents.direct; import java.util.function.Consumer; /** + * This is NOP buffer. An alternate implementation of this interface might buffer the + * events to smooth out 'bursts' + * * Created by peterajohnson on 10/27/17. */ public class BufferedQueueNOP<T> implements BufferedQueue<T> { http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java index 4dfce37..ec2b5ec 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java @@ -23,7 +23,7 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; -import org.apache.usergrid.corepersistence.index.IndexingStrategy; +import org.apache.usergrid.corepersistence.util.CpCollectionUtils; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; @@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.queue.LegacyQueueFig; import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; import org.apache.usergrid.persistence.queue.LegacyQueueMessage; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,17 +55,19 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl { private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class); - private IndexingStrategy configIndexingStrategy = IndexingStrategy.ASYNC; + private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC; private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>(); public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) { super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler); - //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS); bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); }); - configIndexingStrategy = IndexingStrategy.get(queueFig.getQueueStrategy()); + configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy()); + + boolean indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode()); + CpCollectionUtils.setDebugMode(indexDebugMode); } @@ -82,8 +85,10 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl { // failed to dispatch send to SQS try { List<LegacyQueueMessage> indexedMessages = submitToIndex(result, false); + if (logger.isDebugEnabled()) { + logger.debug("Sent {} messages to ES ", indexedMessages.size()); + } } catch (Exception e) { - e.printStackTrace(); for (Serializable body : bodies) { super.offer(body); } @@ -123,16 +128,16 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl { } - protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) { - if (shouldSendToDirectToES(indexingStrategy)) { + protected void offer(final Serializable operation, QueueIndexingStrategy queueIndexingStrategy) { + queueIndexingStrategy = resolveIndexingStrategy(queueIndexingStrategy); + if (queueIndexingStrategy.shouldSendDirectToES()) { List<LegacyQueueMessage> messages = getMessageArray(operation); List<IndexEventResult> result = callEventHandlers(messages); submitToIndex( result, false ); } - // only if single region. - if (shouldSendToAWS(indexingStrategy)) { - super.offer(operation, indexingStrategy); + if (queueIndexingStrategy.shouldSendToAWS()) { + super.offer(operation, queueIndexingStrategy); } } @@ -152,7 +157,7 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl { .map(indexEventResult -> { //record the cycle time - getMessageCycye().update(System.currentTimeMillis() - indexEventResult.getCreationTime()); + getMessageCycle().update(System.currentTimeMillis() - indexEventResult.getCreationTime()); // ingest each index op into our combined, single index op for the index producer if(indexEventResult.getIndexOperationMessage().isPresent()){ @@ -166,23 +171,24 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl { // dispatch to ES - ElasticsearchIndexEvent elasticsearchIndexEvent = getIndexOperationMessage(combined); + ElasticsearchIndexEvent elasticsearchIndexEvent = getESIndexEvent(combined); handleIndexOperation(elasticsearchIndexEvent); return queueMessages; } - private boolean shouldSendToDirectToES(IndexingStrategy indexingStrategy) { - if (indexingStrategy == IndexingStrategy.DEFAULT) { - indexingStrategy = configIndexingStrategy; - } - return (indexingStrategy == IndexingStrategy.DIRECT || indexingStrategy == IndexingStrategy.DIRECTONLY); - } - - private boolean shouldSendToAWS(IndexingStrategy indexingStrategy) { - if (indexingStrategy == IndexingStrategy.DEFAULT) { - indexingStrategy = configIndexingStrategy; + // If the collection has not defined an indexing strategy then use the default from the fig. + // only allow NOINDEX or DIRECTONLY when in debug mode + private QueueIndexingStrategy resolveIndexingStrategy(QueueIndexingStrategy queueIndexingStrategy) { + switch (queueIndexingStrategy) { + case CONFIG: + return configQueueIndexingStrategy; + case NOINDEX: + case DIRECTONLY: + if (!CpCollectionUtils.getDebugMode()) { + return configQueueIndexingStrategy; + } + default: + return queueIndexingStrategy; } - // and is in same region. - return (indexingStrategy != IndexingStrategy.DIRECTONLY); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java deleted file mode 100644 index 69c5445..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.corepersistence.index; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * This class describes the paths an index request can take - * between tomcat and ES. - * - * Created by peterajohnson on 10/30/17. - */ -public enum IndexingStrategy { - - DIRECTONLY("directonly"), // Index request is sent directly to ES and not to AWS - DIRECT("direct"), // Index request is sent directly to ES before sync ASW - SYNC("sync"), // Index request is sent via a sync AWS to ES - ASYNC("async"), // Index request is sent via an async AWS to ES - DEFAULT("default"); // Follow the default setting - - private String name; - - private static final Map<String,IndexingStrategy> NAME_MAP; - - static { - Map<String,IndexingStrategy> map = new HashMap<String,IndexingStrategy>(); - for (IndexingStrategy instance : IndexingStrategy.values()) { - map.put(instance.getName(),instance); - } - NAME_MAP = Collections.unmodifiableMap(map); - } - - IndexingStrategy(String name) { - this.name = name; - } - - public static IndexingStrategy get(String name) { - IndexingStrategy indexingStrategy = NAME_MAP.get(name); - if (indexingStrategy == null) { - return DEFAULT; - } - return indexingStrategy; - } - - - public String getName() { - return this.name; - } - -} - http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java index 13edb2c..34799bb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java @@ -50,6 +50,8 @@ public class Pipeline<InputType> { private final RequestCursor requestCursor; private int limit; + private boolean keepStaleEntries; + private String query; //Generics hell, intentionally without a generic, we check at the filter level private Observable currentObservable; @@ -58,7 +60,7 @@ public class Pipeline<InputType> { /** * Create our filter pipeline */ - public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) { + public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit, boolean keepStaleEntries, String query) { ValidationUtils.validateApplicationScope( applicationScope ); @@ -78,6 +80,9 @@ public class Pipeline<InputType> { final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() ); this.currentObservable = Observable.just( filter ); + + this.keepStaleEntries = keepStaleEntries; + this.query = query; } @@ -86,7 +91,7 @@ public class Pipeline<InputType> { - final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); + final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount, keepStaleEntries, query ); filter.setContext( context ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java index 018abb7..88b5001 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java @@ -39,14 +39,20 @@ public class PipelineContext { private final ApplicationScope applicationScope; private final RequestCursor requestCursor; private final int limit; + // An entry is stale if the ES version number is less than the Cassandra version number + // it can happen if ES was not updated or has yet to be updated. + private final boolean keepStaleEntries; + private String query; - public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) { + public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id, boolean keepStaleEntries, String query ) { this.applicationScope = applicationScope; this.requestCursor = requestCursor; this.limit = limit; this.id = id; + this.keepStaleEntries = keepStaleEntries; + this.query = query; } @@ -78,5 +84,18 @@ public class PipelineContext { return limit; } + /** + * return true if stales entries are not to be filtered out. + */ + public boolean getKeepStaleEntries() { + return keepStaleEntries; + } + + /** + * return the query string if any + */ + public String getQuery() { + return query; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java index 9354127..a3b6fd9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java @@ -24,6 +24,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; +import org.apache.usergrid.corepersistence.service.CollectionSearch; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -33,12 +34,20 @@ public class CandidateBuilder { private final Pipeline<FilterResult<Candidate>> pipeline; private final FilterFactory filterFactory; + private CollectionSearch search; + + public CandidateBuilder(final Pipeline<FilterResult<Candidate>> pipeline, + final FilterFactory filterFactory) { + this(pipeline,filterFactory,null); + } - public CandidateBuilder( final Pipeline<FilterResult<Candidate>> pipeline, - final FilterFactory filterFactory ) { + public CandidateBuilder(final Pipeline<FilterResult<Candidate>> pipeline, + final FilterFactory filterFactory, + CollectionSearch search) { this.pipeline = pipeline; this.filterFactory = filterFactory; + this.search = search; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index a7f9ad9..4f44ac4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; +import org.apache.usergrid.corepersistence.service.CollectionSearch; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -125,9 +126,16 @@ public class IdBuilder { final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter( ql, collectionName, entityType, analyzeOnly ) ); - return new CandidateBuilder( newFilter, filterFactory ); + return new CandidateBuilder( newFilter, filterFactory , null); } + public CandidateBuilder searchCollection(final String collectionName, final String ql, final CollectionSearch search ) { + + final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter( + ql, collectionName, search.getEntityType(), search.getAnalyzeOnly() ) ); + + return new CandidateBuilder( newFilter, filterFactory, search ); + } /** * Search all connections from our input Id and search their connections http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java index f1a44ea..624f9dc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java @@ -44,6 +44,8 @@ public class PipelineBuilder { private Optional<String> cursor = Optional.absent(); private int limit = 10; private final FilterFactory filterFactory; + private boolean keepStaleEntries = false; + private String query = ""; /** @@ -81,6 +83,21 @@ public class PipelineBuilder { return this; } + /** + */ + public PipelineBuilder keepStaleEntries(final boolean keepStaleEntries){ + this.keepStaleEntries = keepStaleEntries; + return this; + } + + /** + */ + public PipelineBuilder query(final Optional<String> query){ + if (query.isPresent()) { + this.query = query.get(); + } + return this; + } /** * Set our start point in our graph traversal to the specified entity id. A 1.0 compatibility API. eventually this should be replaced with @@ -91,7 +108,7 @@ public class PipelineBuilder { */ @Deprecated public IdBuilder fromId(final Id entityId){ - Pipeline<FilterResult<Id>> pipeline = new Pipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) ); + Pipeline<FilterResult<Id>> pipeline = new Pipeline( applicationScope, this.cursor,limit,keepStaleEntries,query ).withFilter( filterFactory.getEntityIdFilter( entityId ) ); return new IdBuilder( pipeline, filterFactory ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java index 7770436..20bcfe9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java @@ -26,10 +26,10 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.apache.usergrid.persistence.model.field.DistanceField; -import org.apache.usergrid.persistence.model.field.DoubleField; import org.apache.usergrid.persistence.model.field.EntityObjectField; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.value.EntityObject; +import org.apache.usergrid.utils.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +74,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate this.entityIndexFactory = entityIndexFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.indexProducer = indexProducer; + } @@ -96,6 +97,9 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate final EntityIndex applicationIndex = entityIndexFactory .createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); + boolean keepStaleEntries = pipelineContext.getKeepStaleEntries(); + String query = pipelineContext.getQuery(); + //buffer them to get a page size we can make 1 network hop final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() ) @@ -119,7 +123,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet, candidateResults,indexProducer) ) - .doOnNext(entityCollector -> entityCollector.merge()) + .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query)) .flatMap(entityCollector -> Observable.from(entityCollector.getResults())) .map(entityFilterResult -> { final Entity entity = entityFilterResult.getValue(); @@ -246,10 +250,10 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate /** * Merge our candidates and our entity set into results */ - public void merge() { + public void merge(boolean keepStaleEntries, String query) { for ( final FilterResult<Candidate> candidateResult : candidateResults ) { - validate( candidateResult ); + validate( candidateResult , keepStaleEntries, query); } indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails @@ -267,7 +271,23 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate } - private void validate( final FilterResult<Candidate> filterResult ) { + // Helper function to convert a UUID time stamp into a unix date + private Date UUIDTimeStampToDate(UUID uuid) { + long timeStamp = 0L; + // The UUID is supposed to be time based so this should always be '1' + // but this is just used for logging so we don't want to throw an error i it is misused. + if (uuid.version() == 1) { + // this is the difference between midnight October 15, 1582 UTC and midnight January 1, 1970 UTC as 100 nanosecond units + long epochDiff = 122192928000000000L; + // the UUID timestamp is in 100 nanosecond units. + // convert that to milliseconds + timeStamp = ((uuid.timestamp()-epochDiff)/10000); + } + return new Date(timeStamp); + } + + + private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query ) { final Candidate candidate = filterResult.getValue(); final CandidateResult candidateResult = candidate.getCandidateResult(); @@ -297,18 +317,40 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate final UUID entityVersion = entity.getVersion(); final Id entityId = entity.getId(); + //entity is newer than ES version, could be a missed or slow index event + if ( UUIDComparator.staticCompare(entityVersion, candidateVersion) > 0 ) { + + Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion); + Date entityTimeStamp = UUIDTimeStampToDate(entityVersion); + + Map<String,String> fields = new HashMap<>(); + for (Field field : entity.getEntity().get().getFields()) { + fields.put(field.getName(),String.valueOf(field.getValue())); + } + + logger.warn( "Found stale entity on edge {} for entityId {} Entity version date = {}. Candidate version date = {}. Will be returned in result set = {} Query = [{}] Entity fields = {}", + searchEdge, + entityId.getUuid(), + DateUtils.instance.formatIso8601Date(entityTimeStamp), + DateUtils.instance.formatIso8601Date(candidateTimeStamp), + keepStaleEntries, + query, + fields + ); + + if (!keepStaleEntries) { + batch.deindex(searchEdge, entityId, candidateVersion); + return; + } + } - - - //entity is newer than ES version, could be an update or the entity is marked as deleted - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || - !entity.getEntity().isPresent() || - entity.getStatus() == MvccEntity.Status.DELETED ) { + // The entity is marked as deleted + if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) { // when updating entities, we don't delete previous versions from ES so this action is expected if(logger.isDebugEnabled()){ - logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}", + logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}", searchEdge, entityId, entityVersion); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java index 6240028..6b6edfc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java @@ -42,6 +42,7 @@ public class CollectionSearch { private final Optional<String> cursor; private Level level = Level.ALL; private boolean analyzeOnly; + private boolean keepStaleEntries; public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String @@ -103,4 +104,12 @@ public class CollectionSearch { public void setAnalyzeOnly(final boolean analyzeOnly){ this.analyzeOnly = analyzeOnly; } + + public boolean getKeepStaleEntries() { + return keepStaleEntries; + } + + public void setKeepStaleEntries(final boolean keepStaleEntries){ + this.keepStaleEntries = keepStaleEntries; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java index 7684050..e052e2e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java @@ -22,6 +22,7 @@ import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder; import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder; import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.util.CpCollectionUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; @@ -58,8 +59,12 @@ public class CollectionServiceImpl implements CollectionService { final Optional<String> query = search.getQuery(); final IdBuilder pipelineBuilder = - pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() ) - .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() ); + pipelineBuilderFactory.create( applicationScope ) + .withCursor( search.getCursor() ) + .withLimit( search.getLimit() ) + .keepStaleEntries(search.getKeepStaleEntries()) + .query(query) + .fromId( search.getCollectionOwnerId() ); final EntityBuilder results; @@ -68,7 +73,7 @@ public class CollectionServiceImpl implements CollectionService { results = pipelineBuilder.traverseCollection( collectionName ).loadEntities(); } else { - results = pipelineBuilder.searchCollection( collectionName, query.get(),search.getEntityType(), search.getAnalyzeOnly()).loadEntities(); + results = pipelineBuilder.searchCollection( collectionName, query.get(),search).loadEntities(); } @@ -81,7 +86,6 @@ public class CollectionServiceImpl implements CollectionService { final ApplicationScope applicationScope = search.getApplicationScope(); final String collectionName = search.getCollectionName(); - final Optional<String> query = search.getQuery(); final IdBuilder pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java index f38cefa..010af2b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java @@ -21,10 +21,11 @@ import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; -import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.queue.settings.IndexConsistency; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import java.util.*; @@ -42,38 +43,60 @@ public class CpCollectionUtils { public static final String SETTING_FIELDS = "fields"; public static final String SETTING_QUEUE_INDEX = "queueIndex"; + public static final String SETTING_INDEX_CONSISTENCY = "indexConsistency"; private static Set<String> VALID_SETTING_NAMES = new HashSet<>(); static { VALID_SETTING_NAMES.add(SETTING_FIELDS); VALID_SETTING_NAMES.add(SETTING_QUEUE_INDEX); + VALID_SETTING_NAMES.add(SETTING_INDEX_CONSISTENCY); } public static Set<String> getValidSettings() { return VALID_SETTING_NAMES; } - public static IndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + // When running in debug mode we all some normally invalid index settings + // like update C* but not ES. + private static boolean debugMode = false; + public static boolean getDebugMode() { + return debugMode; + } - IndexingStrategy indexingStrategy = IndexingStrategy.DEFAULT; - String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX); - if (indexing != null) { - indexingStrategy = IndexingStrategy.get(indexing); + public static void setDebugMode(boolean set) { + debugMode = set; + } + + public static Object validateValue(String name, Object value) { + if (SETTING_QUEUE_INDEX.equals(name)) { + return QueueIndexingStrategy.get(value.toString()).getName(); + } + if (SETTING_INDEX_CONSISTENCY.equals(name)) { + return IndexConsistency.get(value.toString()).getName(); } - return indexingStrategy; + return value; } - public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + public static QueueIndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.CONFIG; String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX); - if ("async".equals(indexing)) { - return Boolean.TRUE; + if (indexing != null) { + queueIndexingStrategy = QueueIndexingStrategy.get(indexing); } - if ("sync".equals(indexing)) { - return Boolean.FALSE; + return queueIndexingStrategy; + } + + + public static IndexConsistency getIndexConsistencyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + + IndexConsistency indexConsistency = IndexConsistency.STRICT; + String indexConsistencyString = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_INDEX_CONSISTENCY); + if ( indexConsistencyString != null) { + indexConsistency = IndexConsistency.get(indexConsistencyString); } - return null; + return indexConsistency; } public static boolean skipIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java index d062ef4..42afa67 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java @@ -138,7 +138,7 @@ public class CollectionDeleteTest extends AbstractCoreIT { logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME); - app.waitForQueueDrainAndRefreshIndex(5000); + app.waitForQueueDrainAndRefreshIndex(15000); final CollectionDeleteRequestBuilder builder = collectionDeleteService.getBuilder() http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java index b444199..437f9bf 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java @@ -153,7 +153,6 @@ public interface EntityIndex extends CPManager { */ String[] getIndexes(); - /** * type of alias */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 3d2f576..211cf70 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -773,6 +773,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { return Health.valueOf( chr.getStatus().name() ); } catch ( Exception ex ) { + ex.printStackTrace(); logger.error( "Error connecting to ElasticSearch", ex.getMessage() ); } @@ -859,7 +860,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { } - /** * Interface for operations. */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index ac7d10d..902c5d3 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -484,6 +484,7 @@ public class EntityIndexTest extends BaseIT { } + @Test public void deleteVerification() throws Throwable { http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java index 4a12d14..4cb6f37 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java @@ -110,13 +110,12 @@ public interface LegacyQueueFig extends GuicyFig { @Default("900000") // 15 minutes int getMapMessageTimeout(); - @Key("usergrid.queue.is.async") - @Default("true") - boolean isAsyncQueue(); - - @Key("usergrid.queue.strategy") @Default("async") String getQueueStrategy(); + @Key("usergrid.queue.test") + @Default("false") + String getQueueDebugMode(); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index bc9be57..b18411d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.model.*; +import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -541,10 +542,9 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { } } - @Override public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException { - boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue(); + boolean sendAsync = async == null || async.booleanValue(); if (sendAsync) { sendMessageToAllRegionsAsync(body); } else { @@ -552,7 +552,6 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { } } - private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException { if ( sns == null ) { logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); @@ -634,8 +633,9 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { @Override public void sendMessages( final List bodies ) throws IOException { + QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.get(fig.getQueueStrategy()); for ( Object body : bodies ) { - if (fig.isAsyncQueue()) { + if (queueIndexingStrategy == QueueIndexingStrategy.ASYNC) { sendMessageToLocalRegionAsync((Serializable) body); } else { sendMessageToLocalRegionSync((Serializable) body); @@ -682,7 +682,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { @Override public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException { - boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue(); + boolean sendAsync = async.booleanValue(); if (sendAsync) { sendMessageToLocalRegionAsync(body); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java new file mode 100644 index 0000000..531716a --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.queue.settings; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * This class describes the consistency rules when returning results set between C* and ES + * + * Created by peterajohnson on 10/30/17. + */ +public enum IndexConsistency { + + STRICT("strict"), // Result canidate must be exact match to be returned in result set + LATEST("latest"); // Result canidate must be exact match OR most recent version to be returned in result set + + private String name; + + private static final Map<String,IndexConsistency> NAME_MAP; + + static { + Map<String,IndexConsistency> map = new HashMap<>(); + for (IndexConsistency instance : IndexConsistency.values()) { + map.put(instance.getName(),instance); + } + NAME_MAP = Collections.unmodifiableMap(map); + } + + IndexConsistency(String name) { + this.name = name; + } + + public static IndexConsistency get(String name) { + IndexConsistency queueIndexingStrategy = NAME_MAP.get(name); + if (queueIndexingStrategy == null) { + return LATEST; + } + return queueIndexingStrategy; + } + + + public String getName() { + return this.name; + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java new file mode 100644 index 0000000..375de71 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.queue.settings; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * This class describes the paths an index request can take + * between tomcat and ES. + * + * Created by peterajohnson on 10/30/17. + */ +public enum QueueIndexingStrategy { + + NOINDEX("debug_noindex"), // Do not Index the entity (DEBUG only use for testing) + DIRECTONLY("debug_directonly"), // Index request is sent directly to ES and not to AWS + + DIRECT("direct"), // Index request is sent directly to ES before sync ASW + SYNC("sync"), // Index request is sent via a sync AWS to ES + ASYNC("async"), // Index request is sent via an async AWS to ES + CONFIG("config"); // Follow the default setting of the fig + + private String name; + + private static final Map<String,QueueIndexingStrategy> NAME_MAP; + + static { + Map<String,QueueIndexingStrategy> map = new HashMap<String,QueueIndexingStrategy>(); + for (QueueIndexingStrategy instance : QueueIndexingStrategy.values()) { + map.put(instance.getName(),instance); + } + NAME_MAP = Collections.unmodifiableMap(map); + } + + QueueIndexingStrategy(String name) { + this.name = name; + } + + public static QueueIndexingStrategy get(String name) { + QueueIndexingStrategy queueIndexingStrategy = NAME_MAP.get(name); + if (queueIndexingStrategy == null) { + return CONFIG; + } + return queueIndexingStrategy; + } + + + public String getName() { + return this.name; + } + + public boolean shouldSendDirectToES() { + return (this == QueueIndexingStrategy.DIRECT || this == QueueIndexingStrategy.DIRECTONLY); + } + + public boolean shouldSendToAWS() { + // and is in same region. + return (this != QueueIndexingStrategy.DIRECTONLY && this != QueueIndexingStrategy.NOINDEX); + } + +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java new file mode 100644 index 0000000..77e06ed --- /dev/null +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.rest.interceptors; + +import org.glassfish.jersey.server.ContainerRequest; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import javax.inject.*; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.ext.*; +import javax.ws.rs.ext.Provider; + +/** + * If the request had an ACCEPT_ENCODING header containing 'gzip' then + * gzip the response and add CONTENT_ENCODING gzip header + * + * If the request had an CONTENT_ENCODING header containing 'gzip' then + * unzip the request and remove the CONTENT_ENCODING gzip header + * + * Created by peterajohnson on 11/1/17. + */ +@Provider +public class GZIPInterceptor implements ReaderInterceptor, WriterInterceptor { + + final private static String GZIP = "gzip"; + @Inject + private javax.inject.Provider<ContainerRequest> requestProvider; + + @Override + public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException { + ContainerRequest request = requestProvider.get(); + + if (request != null) { + List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING); + if (aeHeaders != null && aeHeaders.size() > 0) { + String acceptEncodingHeader = aeHeaders.get(0); + if (acceptEncodingHeader.contains(GZIP)) { + OutputStream outputStream = context.getOutputStream(); + context.setOutputStream(new GZIPOutputStream(outputStream)); + context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP); + } + } + } + context.proceed(); + } + + @Override + public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException { + String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); + if (GZIP.equalsIgnoreCase(encoding)) { + GZIPInputStream is = new GZIPInputStream(context.getInputStream()); + context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING); + context.setInputStream(is); + } + + return context.proceed(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java deleted file mode 100644 index f562475..0000000 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.rest.interceptors; - -import org.glassfish.jersey.server.ContainerRequest; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import javax.inject.*; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.ext.*; -import javax.ws.rs.ext.Provider; - -/** - * If the request had an ACCEPT_ENCODING header containing 'gzip' then - * gzip the response and add CONTENT_ENCODING gzip header - * - * * If the request had an CONTENT_ENCODING header containing 'gzip' then - * unzip the request and remove the CONTENT_ENCODING gzip header - * Created by peterajohnson on 11/1/17. - */ -@Provider -public class GZIPWriterInterceptor implements ReaderInterceptor, WriterInterceptor { - - final private static String GZIP = "gzip"; - @Inject - private javax.inject.Provider<ContainerRequest> requestProvider; - - @Override - public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException { - ContainerRequest request = requestProvider.get(); - - if (request != null) { - List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING); - if (aeHeaders != null && aeHeaders.size() > 0) { - String acceptEncodingHeader = aeHeaders.get(0); - if (acceptEncodingHeader.contains(GZIP)) { - OutputStream outputStream = context.getOutputStream(); - context.setOutputStream(new GZIPOutputStream(outputStream)); - context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP); - } - } - } - context.proceed(); - } - - @Override - public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException { - String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); - if (GZIP.equalsIgnoreCase(encoding)) { - GZIPInputStream is = new GZIPInputStream(context.getInputStream()); - context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING); - context.setInputStream(is); - } - - return context.proceed(); - } -}
