http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java index 107b2a0..9ed254c 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection.serialization.impl; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -43,6 +45,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import com.fasterxml.uuid.UUIDComparator; import com.google.inject.Inject; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; @@ -65,15 +68,15 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { private MvccLogEntrySerializationStrategy logEntryStrategy; + @Before - public void wireLogEntryStrategy(){ + public void wireLogEntryStrategy() { logEntryStrategy = getLogEntryStrategy(); } /** * Get the log entry strategy from - * @return */ protected abstract MvccLogEntrySerializationStrategy getLogEntryStrategy(); @@ -181,6 +184,131 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { } + @Test + public void getReversedEntries() throws ConnectionException { + + final Id applicationId = new SimpleId( "application" ); + + ApplicationScope context = new ApplicationScopeImpl( applicationId ); + + + final Id id = new SimpleId( "test" ); + + int count = 10; + + final UUID[] versions = new UUID[count]; + final Stage COMPLETE = Stage.COMPLETE; + final MvccLogEntry[] entries = new MvccLogEntry[count]; + + + for ( int i = 0; i < count; i++ ) { + versions[i] = UUIDGenerator.newTimeUUID(); + + entries[i] = new MvccLogEntryImpl( id, versions[i], COMPLETE, MvccLogEntry.State.COMPLETE ); + logEntryStrategy.write( context, entries[i] ).execute(); + + //Read it back + + MvccLogEntry returned = + logEntryStrategy.load( context, Collections.singleton( id ), versions[i] ).getMaxVersion( id ); + + assertNotNull( "Returned value should not be null", returned ); + + assertEquals( "Returned should equal the saved", entries[i], returned ); + } + + + final UUID[] assertVersions = Arrays.copyOf( versions, versions.length ); + + Arrays.sort( assertVersions, ( v1, v2 ) -> UUIDComparator.staticCompare( v1, v2 ) * -1 ); + + //now do a range scan from the end + + final int half = count / 2; + + final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half ); + + assertEquals( half, results.size() ); + + for ( int i = 0; i < count / 2; i++ ) { + final MvccLogEntry saved = entries[i]; + final MvccLogEntry returned = results.get( i ); + + assertEquals( "Entry was not equal to the saved value", saved, returned ); + } + + + //now get the next batch + final List<MvccLogEntry> results2 = logEntryStrategy.loadReversed( context, id, versions[half], count ); + + assertEquals( half, results2.size() ); + + for ( int i = 0; i < half; i++ ) { + final MvccLogEntry saved = entries[half + i]; + final MvccLogEntry returned = results2.get( i ); + + assertEquals( "Entry was not equal to the saved value", saved, returned ); + } + + + //now delete them all and ensure we get no results back + for ( int i = 0; i < count; i++ ) { + logEntryStrategy.delete( context, id, versions[i] ).execute(); + } + + final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length ); + + assertEquals( "All log entries were deleted", 0, results3.size() ); + } + + + @Test + public void createAndDeleteEntries() throws ConnectionException { + + final Id applicationId = new SimpleId( "application" ); + + ApplicationScope context = new ApplicationScopeImpl( applicationId ); + + + final Id id = new SimpleId( "test" ); + + + final int size = 10; + + final List<MvccLogEntry> savedEntries = new ArrayList<>( size ); + + for ( int i = 0; i < size; i++ ) { + final UUID version = UUIDGenerator.newTimeUUID(); + MvccLogEntry saved = new MvccLogEntryImpl( id, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE ); + logEntryStrategy.write( context, saved ).execute(); + + savedEntries.add( saved ); + } + + //now test we get them all back + + final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, null, size ); + + assertEquals( size, results.size() ); + + //assert they're the same + for ( int i = 0; i < size; i++ ) { + assertEquals( savedEntries.get( i ), results.get( i ) ); + } + + //now delete them all + + for ( final MvccLogEntry mvccLogEntry : savedEntries ) { + logEntryStrategy.delete( context, id, mvccLogEntry.getVersion() ).execute(); + } + + //now get them back, should be empty + final List<MvccLogEntry> emptyResults = logEntryStrategy.loadReversed( context, id, null, size ); + + assertEquals( 0, emptyResults.size() ); + } + + @Test( expected = NullPointerException.class ) public void writeParamsNoContext() throws ConnectionException { logEntryStrategy.write( null, mock( MvccLogEntry.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java index 93de9d4..cd6ad3d 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java @@ -20,15 +20,11 @@ package org.apache.usergrid.persistence.collection.util;/* import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.TreeMap; import java.util.UUID; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl; @@ -50,7 +46,11 @@ import static org.mockito.Mockito.when; public class LogEntryMock { - private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE); + private final TreeMap<UUID, MvccLogEntry> reversedEntries = + new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) * -1 ); + + private final TreeMap<UUID, MvccLogEntry> entries = + new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) ); private final Id entityId; @@ -61,78 +61,92 @@ public class LogEntryMock { * @param entityId The entity Id to use * @param versions The versions to use */ - private LogEntryMock(final Id entityId, final List<UUID> versions ) { + private LogEntryMock( final Id entityId, final List<UUID> versions ) { this.entityId = entityId; - for ( UUID version: versions) { - entries.put( version, new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) ); + for ( UUID version : versions ) { + final MvccLogEntry mvccLogEntry = + new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ); + reversedEntries.put( version, mvccLogEntry ); + entries.put( version, mvccLogEntry ); } } /** * Init the mock with the given data structure + * * @param logEntrySerializationStrategy The strategy to moc - * @param scope - * @throws ConnectionException */ - private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope ) + private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final ApplicationScope scope ) - throws ConnectionException { + throws ConnectionException { //wire up the mocks - when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class) )).thenAnswer( new Answer<List<MvccLogEntry>>() { - + when( logEntrySerializationStrategy + .load( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer( - @Override - public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable { + invocation -> { final UUID startVersion = ( UUID ) invocation.getArguments()[2]; - final int count = (Integer)invocation.getArguments()[3]; + final int count = ( Integer ) invocation.getArguments()[3]; final List<MvccLogEntry> results = new ArrayList<>( count ); - final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator(); + final Iterator<MvccLogEntry> itr = reversedEntries.tailMap( startVersion, true ).values().iterator(); - for(int i = 0; i < count && itr.hasNext(); i ++){ + for ( int i = 0; i < count && itr.hasNext(); i++ ) { results.add( itr.next() ); } return results; - } - } ); - } + } ); - /** - * Get the entry at the specified index from high to low - * @param index - * @return - */ - public MvccLogEntry getEntryAtIndex(final int index){ + //mock in reverse - final Iterator<MvccLogEntry> itr = entries.values().iterator(); + when( logEntrySerializationStrategy + .loadReversed( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer( - for(int i = 0; i < index; i ++){ - itr.next(); - } + invocation -> { + final UUID startVersion = ( UUID ) invocation.getArguments()[2]; + final int count = ( Integer ) invocation.getArguments()[3]; + + + final List<MvccLogEntry> results = new ArrayList<>( count ); + + final Iterator<MvccLogEntry> itr; + + if ( startVersion == null ) { + itr = entries.values().iterator(); + } + else { + itr = entries.tailMap( startVersion, true ).values().iterator(); + } + + for ( int i = 0; i < count && itr.hasNext(); i++ ) { + results.add( itr.next() ); + } - return itr.next(); + + return results; + } ); } /** - * * @param logEntrySerializationStrategy The mock to use * @param scope The scope to use * @param entityId The entityId to use * @param versions The versions to mock - * @throws ConnectionException */ - public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,final Id entityId, final List<UUID> versions ) + public static LogEntryMock createLogEntryMock( + final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope, + final Id entityId, final List<UUID> versions ) - throws ConnectionException { + throws ConnectionException { LogEntryMock mock = new LogEntryMock( entityId, versions ); mock.initMock( logEntrySerializationStrategy, scope ); @@ -141,19 +155,12 @@ public class LogEntryMock { } - public Collection<MvccLogEntry> getEntries() { - return entries.values(); + public Collection<MvccLogEntry> getReversedEntries() { + return reversedEntries.values(); } - private static final class ReversedUUIDComparator implements Comparator<UUID> { - - public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator(); - - - @Override - public int compare( final UUID o1, final UUID o2 ) { - return UUIDComparator.staticCompare( o1, o2 ) * -1; - } + public Collection<MvccLogEntry> getEntries() { + return entries.values(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties index acf5c39..7b55cf8 100644 --- a/stack/corepersistence/collection/src/test/resources/log4j.properties +++ b/stack/corepersistence/collection/src/test/resources/log4j.properties @@ -33,4 +33,5 @@ log4j.logger.cassandra.db=ERROR #log4j.logger.org.apache.usergrid=DEBUG #log4j.logger.org.apache.usergrid.persistence.collection=TRACE +log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java new file mode 100644 index 0000000..c653458 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.executor; + + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * A task executor that allows you to submit tasks + */ +public class TaskExecutorFactory { + + + /** + * Create a task executor + * @param schedulerName + * @param maxThreadCount + * @param maxQueueSize + * @return + */ + public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount, + final int maxQueueSize ) { + + + final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize ); + + + final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount ); + + + return threadPool; + } + + + /** + * Create a thread pool that will reject work if our audit tasks become overwhelmed + */ + private static final class MaxSizeThreadPool extends ThreadPoolExecutor { + + public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) { + super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ) ); + } + } + + + /** + * Thread factory that will name and count threads for easier debugging + */ + private static final class CountingThreadFactory implements ThreadFactory { + + private final AtomicLong threadCounter = new AtomicLong(); + private final String poolName; + + + private CountingThreadFactory( final String poolName ) {this.poolName = poolName;} + + + @Override + public Thread newThread( final Runnable r ) { + final long newValue = threadCounter.incrementAndGet(); + + final String threadName = poolName + "-" + newValue; + + Thread t = new Thread( r, threadName ); + + //set it to be a daemon thread so it doesn't block shutdown + t.setDaemon( true ); + + return t; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java index d6cc5e8..f28e190 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java @@ -20,8 +20,6 @@ package org.apache.usergrid.persistence.core.rx; -import org.apache.usergrid.persistence.core.task.Task; - import rx.Scheduler; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java deleted file mode 100644 index 9007167..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - - -/** - * Implementation of the task executor with a unique name and size - */ -public class NamedTaskExecutorImpl implements TaskExecutor { - - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class ); - - private final ListeningExecutorService executorService; - - private final String name; - private final int poolSize; - - - /** - * @param name The name of this instance of the task executor - * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once. - * @param queueLength The length of tasks to keep in the queue - */ - public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) { - Preconditions.checkNotNull( name ); - Preconditions.checkArgument( name.length() > 0, "name must have a length" ); - Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" ); - Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" ); - - this.name = name; - this.poolSize = poolSize; - - // The user has chosen to disable asynchronous execution, - // to create an executor service that will reject all requests - if ( poolSize == 0 ) { - executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService()); - } - - //queue executions as normal - else { - final BlockingQueue<Runnable> queue = queueLength > 0 - ? new ArrayBlockingQueue<Runnable>(queueLength) : new SynchronousQueue<Runnable>(); - - executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) ); - } - } - - - @Override - public <V> ListenableFuture<V> submit( final Task<V> task ) { - - final ListenableFuture<V> future; - - try { - future = executorService.submit( task ); - - // Log our success or failures for debugging purposes - Futures.addCallback( future, new TaskFutureCallBack<V>( task ) ); - } - catch ( RejectedExecutionException ree ) { - return Futures.immediateFuture( task.rejected()); - } - - return future; - } - - - @Override - public void shutdown() { - this.executorService.shutdownNow(); - } - - - /** - * Callback for when the task succeeds or fails. - */ - private static final class TaskFutureCallBack<V> implements FutureCallback<V> { - - private final Task<V> task; - - - private TaskFutureCallBack( Task<V> task ) { - this.task = task; - } - - - @Override - public void onSuccess( @Nullable final V result ) { - LOG.debug( "Successfully completed task ", task ); - } - - - @Override - public void onFailure( final Throwable t ) { - LOG.error( "Unable to execute task. Exception is ", t ); - - task.exceptionThrown( t ); - } - } - - - /** - * Create a thread pool that will reject work if our audit tasks become overwhelmed - */ - private final class MaxSizeThreadPool extends ThreadPoolExecutor { - - public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) { - - super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), - new RejectedHandler() ); - } - } - - - /** - * Thread factory that will name and count threads for easier debugging - */ - private final class CountingThreadFactory implements ThreadFactory { - - private final AtomicLong threadCounter = new AtomicLong(); - - - @Override - public Thread newThread( final Runnable r ) { - final long newValue = threadCounter.incrementAndGet(); - - Thread t = new Thread( r, name + "-" + newValue ); - - t.setDaemon( true ); - - return t; - } - } - - - /** - * The handler that will handle rejected executions and signal the interface - */ - private final class RejectedHandler implements RejectedExecutionHandler { - - - @Override - public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { - LOG.warn( "{} task queue full, rejecting task {}", name, r ); - - throw new RejectedExecutionException( "Unable to run task, queue full" ); - } - - } - - - /** - * Executor implementation that simply rejects all incoming tasks - */ - private static final class RejectingExecutorService implements ExecutorService{ - - @Override - public void shutdown() { - - } - - - @Override - public List<Runnable> shutdownNow() { - return Collections.EMPTY_LIST; - } - - - @Override - public boolean isShutdown() { - return false; - } - - - @Override - public boolean isTerminated() { - return false; - } - - - @Override - public boolean awaitTermination( final long timeout, final TimeUnit unit ) - throws InterruptedException { - return false; - } - - - @Override - public <T> Future<T> submit( final Callable<T> task ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> Future<T> submit( final Runnable task, final T result ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public Future<?> submit( final Runnable task ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks ) - throws InterruptedException { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, - final long timeout, final TimeUnit unit ) throws InterruptedException { - - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> T invokeAny( final Collection<? extends Callable<T>> tasks ) - throws InterruptedException, ExecutionException { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, - final TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public void execute( final Runnable command ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java deleted file mode 100644 index 5582161..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - - -import java.util.concurrent.Callable; - - -/** - * The task to execute - */ -public interface Task<V> extends Callable<V> { - - - /** - * Invoked when this task throws an uncaught exception. - * @param throwable - */ - void exceptionThrown(final Throwable throwable); - - /** - * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. - * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the - * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the - * request and process later (lazy repair for instance ) do so. - * - */ - V rejected(); - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java deleted file mode 100644 index 5728d2e..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - - -import com.google.common.util.concurrent.ListenableFuture; - - -/** - * An interface for execution of tasks - */ -public interface TaskExecutor { - - /** - * Submit the task asynchronously - * @param task - */ - public <V> ListenableFuture<V> submit( Task<V> task ); - - /** - * Stop the task executor without waiting for scheduled threads to run - */ - public void shutdown(); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java deleted file mode 100644 index 4f95918..0000000 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - - -/** - * Tests for the namedtask execution impl - */ -public class NamedTaskExecutorImplTest { - - - @Test - public void jobSuccess() throws InterruptedException { - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( task ); - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - - assertEquals( 0l, exceptionLatch.getCount() ); - - assertEquals( 0l, rejectedLatch.getCount() ); - } - - - @Test - public void exceptionThrown() throws InterruptedException { - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 1 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - final RuntimeException re = new RuntimeException( "throwing exception" ); - - final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) { - @Override - public Void call() throws Exception { - super.call(); - throw re; - } - }; - - executor.submit( task ); - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - exceptionLatch.await( 1000, TimeUnit.MILLISECONDS ); - - assertSame( re, task.exceptions.get( 0 ) ); - - - assertEquals( 0l, rejectedLatch.getCount() ); - } - - - @Test - public void noCapacity() throws InterruptedException { - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - - final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) { - @Override - public Void call() throws Exception { - super.call(); - - //park this thread so it takes up a task and the next is rejected - final Object mutex = new Object(); - - synchronized ( mutex ) { - mutex.wait(); - } - - return null; - } - }; - - executor.submit( task ); - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //now submit the second task - - - final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 ); - final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch secondRunLatch = new CountDownLatch( 1 ); - - - final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( testTask ); - - - secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //if we get here we've been rejected, just double check we didn't run - - assertEquals( 1l, secondRunLatch.getCount() ); - assertEquals( 0l, secondExceptionLatch.getCount() ); - } - - - @Test - public void noCapacityWithQueue() throws InterruptedException { - - final int threadPoolSize = 1; - final int queueSize = 10; - - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - int iterations = threadPoolSize + queueSize; - - for(int i = 0; i < iterations; i ++) { - - final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) { - @Override - public Void call() throws Exception { - super.call(); - - //park this thread so it takes up a task and the next is rejected - final Object mutex = new Object(); - - synchronized ( mutex ) { - mutex.wait(); - } - - return null; - } - }; - executor.submit( task ); - } - - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //now submit the second task - - - final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 ); - final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch secondRunLatch = new CountDownLatch( 1 ); - - - final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( testTask ); - - - secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //if we get here we've been rejected, just double check we didn't run - - assertEquals( 1l, secondRunLatch.getCount() ); - assertEquals( 0l, secondExceptionLatch.getCount() ); - } - - - @Test - public void rejectingTaskExecutor() throws InterruptedException { - - final int threadPoolSize = 0; - final int queueSize = 0; - - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 1 ); - final CountDownLatch runLatch = new CountDownLatch( 0 ); - - - //now submit the second task - - - - final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( testTask ); - - - //should be immediately rejected - rejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //if we get here we've been rejected, just double check we didn't run - - assertEquals( 0l, exceptionLatch.getCount() ); - assertEquals( 0l, runLatch.getCount() ); - } - - - private static abstract class TestTask<V> implements Task<V> { - - private final List<Throwable> exceptions; - private final CountDownLatch exceptionLatch; - private final CountDownLatch rejectedLatch; - private final CountDownLatch runLatch; - - - private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch, - final CountDownLatch runLatch ) { - this.exceptionLatch = exceptionLatch; - this.rejectedLatch = rejectedLatch; - this.runLatch = runLatch; - - this.exceptions = new ArrayList<>(); - } - - - - @Override - public void exceptionThrown( final Throwable throwable ) { - this.exceptions.add( throwable ); - exceptionLatch.countDown(); - } - - - @Override - public V rejected() { - rejectedLatch.countDown(); - return null; - } - - - @Override - public V call() throws Exception { - runLatch.countDown(); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index 894e74a..2a153e2 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -31,46 +31,46 @@ import org.safehaus.guicyfig.Key; public interface GraphFig extends GuicyFig { - public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size"; + String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size"; - public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size"; + String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size"; /** * The size of the shards. This is approximate, and should be set lower than what you would like your max to be */ - public static final String SHARD_SIZE = "usergrid.graph.shard.size"; + String SHARD_SIZE = "usergrid.graph.shard.size"; /** * Number of shards we can cache. */ - public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size"; + String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size"; /** * Get the cache timeout. The local cache will exist for this amount of time max (in millis). */ - public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout"; + String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout"; /** * Number of worker threads to refresh the cache */ - public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count"; + String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count"; /** * The size of the worker count for shard auditing */ - public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size"; + String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size"; /** * The size of the worker count for shard auditing */ - public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count"; + String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count"; - public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance"; + String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance"; /** @@ -80,14 +80,14 @@ public interface GraphFig extends GuicyFig { * Note that you should also pad this for node clock drift. A good value for this would be 2x the shard cache * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds */ - public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta"; + String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta"; - public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count"; + String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count"; - public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval"; + String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval"; - public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size"; + String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size"; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java index 4c38c13..987a36c 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java @@ -64,23 +64,23 @@ public interface GraphManager extends CPManager { /** - * @param edge The edge to delete + * @param edge Mark the edge as deleted in the graph * * * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version */ - Observable<Edge> deleteEdge( Edge edge ); + Observable<Edge> markEdge( Edge edge ); /** * - * Remove the node from the graph. + * Mark the node as removed from the graph. * * @param node The node to remove * @param timestamp The timestamp to apply the delete operation. Any edges connected to this node with a timestmap * <= the specified time will be removed from the graph * @return */ - Observable<Id> deleteNode(Id node, long timestamp); + Observable<Id> markNode( Id node, long timestamp ); /** * Get all versions of this edge where versions <= max version http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java index eb49711..d73f767 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java @@ -36,7 +36,7 @@ public interface GraphManagerFactory * * @param collectionScope The context to use when creating the graph manager */ - public GraphManager createEdgeManager( ApplicationScope collectionScope ); + GraphManager createEdgeManager( ApplicationScope collectionScope ); void invalidate(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java index 114440f..38f62b5 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java @@ -59,6 +59,12 @@ public interface SearchByEdge { long getMaxTimestamp(); /** + * Return true if we should filter edges marked for deletion + * @return + */ + boolean filterMarked(); + + /** * The optional start parameter. All edges emitted with be > the specified start edge. * This is useful for paging. Simply use the last value returned in the previous call in the start parameter * @return http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java index 5e47ae0..f213b00 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java @@ -65,6 +65,12 @@ public interface SearchByEdgeType { */ Order getOrder(); + /** + * Return true to filter marked edges from the results + * @return + */ + boolean filterMarked(); + /** * Options for ordering. By default, we want to perform descending for common use cases and read speed. This is our our data http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java index 6cdaef0..4b628d1 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java @@ -28,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin; import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship; import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet; import org.apache.usergrid.persistence.core.migration.schema.Migration; -import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener; @@ -196,15 +194,6 @@ public abstract class GraphModule extends AbstractModule { } - @Inject - @Singleton - @Provides - @GraphTaskExecutor - public TaskExecutor graphTaskExecutor( final GraphFig graphFig ) { - return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), - graphFig.getShardAuditWorkerQueueSize() ); - } - /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 9c0c62d..9a8a00f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -120,12 +120,9 @@ public class GraphManagerImpl implements GraphManager { @Inject public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final EdgeSerialization storageEdgeSerialization, - final NodeSerialization nodeSerialization, - final GraphFig graphFig, - final EdgeDeleteListener edgeDeleteListener, - final NodeDeleteListener nodeDeleteListener, - final ApplicationScope scope, - MetricsFactory metricsFactory) { + final NodeSerialization nodeSerialization, final GraphFig graphFig, + final EdgeDeleteListener edgeDeleteListener, final NodeDeleteListener nodeDeleteListener, + final ApplicationScope scope, MetricsFactory metricsFactory ) { ValidationUtils.validateApplicationScope( scope ); @@ -146,36 +143,34 @@ public class GraphManagerImpl implements GraphManager { this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE; this.nodeDelete = MetricSubscriber.INSTANCE; - this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter"); - this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "write.edge.timer"); - this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.edge.meter"); - this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.edge.timer"); - this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.node.meter"); - this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.node.timer"); - this.loadEdgesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter"); - this.loadEdgesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer"); - this.loadEdgesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter"); - this.loadEdgesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer"); - this.loadEdgesVersionsMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter"); - this.loadEdgesVersionsTimer = metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer"); - this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter"); - this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer"); - this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter"); - this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer"); - - this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer"); - this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.from.meter"); - - this.getIdTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer"); - this.getIdTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter"); - - this.getEdgeTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer"); - this.getEdgeTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter"); - - this.getIdTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer"); - this.getIdTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter"); - - + this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "write.edge.meter" ); + this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "write.edge.timer" ); + this.deleteEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.edge.meter" ); + this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.edge.timer" ); + this.deleteNodeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.node.meter" ); + this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.node.timer" ); + this.loadEdgesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.meter" ); + this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.timer" ); + this.loadEdgesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.meter" ); + this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.timer" ); + this.loadEdgesVersionsMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.versions.meter" ); + this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.versions.timer" ); + this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.type.meter" ); + this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.type.timer" ); + this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.type.meter" ); + this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.type.timer" ); + + this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.from.timer" ); + this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.from.meter" ); + + this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.from.timer" ); + this.getIdTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.from.meter" ); + + this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.to.timer" ); + this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.to.meter" ); + + this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.to.timer" ); + this.getIdTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.to.meter" ); } @@ -209,41 +204,39 @@ public class GraphManagerImpl implements GraphManager { return edge; } - } ) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( new Action1<Notification<? super Edge>>() { + @Override + public void call( Notification<? super Edge> notification ) { + meter.mark(); + } + } ).doOnCompleted( new Action0() { + @Override + public void call() { + timer.stop(); + } + } ); } @Override - public Observable<Edge> deleteEdge( final Edge edge ) { - GraphValidation.validateEdge(edge); + public Observable<Edge> markEdge( final Edge edge ) { + GraphValidation.validateEdge( edge ); - final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true); + final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true ); final Timer.Context timer = deleteEdgeTimer.time(); final Meter meter = deleteEdgeMeter; - return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() { + return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() { @Override - public Edge call(final MarkedEdge edge) { + public Edge call( final MarkedEdge edge ) { final UUID timestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge(scope, edge, timestamp); + final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp ); - LOG.debug("Marking edge {} as deleted to commit log", edge); + LOG.debug( "Marking edge {} as deleted to commit log", edge ); try { edgeMutation.execute(); } @@ -254,73 +247,50 @@ public class GraphManagerImpl implements GraphManager { //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge, // timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber ); - edgeDeleteListener.receive(scope, markedEdge, timestamp).subscribeOn(Schedulers.io()) - .subscribe(edgeDeleteSubcriber); + edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() ) + .subscribe( edgeDeleteSubcriber ); return edge; } - }) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @Override - public Observable<Id> deleteNode( final Id node, final long timestamp ) { + public Observable<Id> markNode( final Id node, final long timestamp ) { final Timer.Context timer = deleteNodeTimer.time(); final Meter meter = deleteNodeMeter; - return Observable.just( node ).map( new Func1<Id, Id>() { - @Override - public Id call( final Id id ) { + return Observable.just( node ).map( id -> { - //mark the node as deleted + //mark the node as deleted - final UUID eventTimestamp = UUIDGenerator.newTimeUUID(); + final UUID eventTimestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp ); + final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp ); - LOG.debug( "Marking node {} as deleted to node mark", node ); - try { - nodeMutation.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute mutation", e ); - } + LOG.debug( "Marking node {} as deleted to node mark", node ); + try { + nodeMutation.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute mutation", e ); + } - //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn( - // Schedulers.io() ).subscribe( nodeDelete ); - nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() ) - .subscribe( nodeDelete ); + //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn( + // Schedulers.io() ).subscribe( nodeDelete ); + nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() ) + .subscribe( nodeDelete ); - return id; - } - } ) - .doOnEach(new Action1<Notification<? super Id>>() { - @Override - public void call(Notification<? super Id> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + return id; + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -333,20 +303,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(searchByEdge.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ) + .cast( Edge.class ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -359,20 +320,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesFromSource( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) + .doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -385,20 +337,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesToTarget( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) + .doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -411,21 +354,12 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + .cast( Edge.class ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -438,20 +372,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) + .doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -464,19 +389,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -489,19 +404,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getIdTypesFromSource( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -514,19 +419,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -539,19 +434,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getIdTypesToTarget( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -561,10 +446,12 @@ public class GraphManagerImpl implements GraphManager { private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> { private final long maxVersion; + private final boolean filterMarked; - private EdgeBufferFilter( final long maxVersion ) { + private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) { this.maxVersion = maxVersion; + this.filterMarked = filterMarked; } @@ -579,57 +466,36 @@ public class GraphManagerImpl implements GraphManager { public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) { final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); - return Observable.from( markedEdges ).filter( new EdgeFilter( this.maxVersion, markedVersions ) ); - } - } - - - /** - * Filter the returned values based on the max uuid and if it's been marked for deletion or not - */ - private static class EdgeFilter implements Func1<MarkedEdge, Boolean> { - - private final long maxTimestamp; - - private final Map<Id, Long> markCache; - - - private EdgeFilter( final long maxTimestamp, Map<Id, Long> markCache ) { - this.maxTimestamp = maxTimestamp; - this.markCache = markCache; - } - - - @Override - public Boolean call( final MarkedEdge edge ) { - + final long maxTimestamp = maxVersion; - final long edgeTimestamp = edge.getTimestamp(); + return Observable.from( markedEdges ).filter( edge -> { + final long edgeTimestamp = edge.getTimestamp(); - //our edge needs to not be deleted and have a version that's > max Version - if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { - return false; - } + //our edge needs to not be deleted and have a version that's > max Version + if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { + return false; + } - final Long sourceTimestamp = markCache.get( edge.getSourceNode() ); + final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); - //the source Id has been marked for deletion. It's version is <= to the marked version for deletion, - // so we need to discard it - if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { - return false; - } + //the source Id has been marked for deletion. It's version is <= to the marked version for deletion, + // so we need to discard it + if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { + return false; + } - final Long targetTimestamp = markCache.get( edge.getTargetNode() ); + final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); - //the target Id has been marked for deletion. It's version is <= to the marked version for deletion, - // so we need to discard it - if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { - return false; - } + //the target Id has been marked for deletion. It's version is <= to the marked version for deletion, + // so we need to discard it + if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { + return false; + } - return true; + return true; + } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java index a28a0bb..9a23caf 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java @@ -33,7 +33,6 @@ import com.google.common.base.Preconditions; /** * Simple bean implementation of search by edge - * */ public class SimpleSearchByEdge implements SearchByEdge { @@ -43,24 +42,44 @@ public class SimpleSearchByEdge implements SearchByEdge { private final long maxTimestamp; private final Optional<Edge> last; private final SearchByEdgeType.Order order; + private final boolean filterMarked; /** * Create the search modules + * * @param sourceNode The source node of the edge * @param targetNode The target node of the edge * @param type The edge type * @param maxTimestamp The maximum timestamp to seek from * @param last The value to start seeking from. Must be >= this value */ - public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Optional<Edge> last ) { + public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, + final SearchByEdgeType.Order order, final Optional<Edge> last ) { + this( sourceNode, type, targetNode, maxTimestamp, order, last, true ); + } + + + /** + * Create the search modules + * + * @param sourceNode The source node of the edge + * @param type The edge type + * @param targetNode The target node of the edge + * @param maxTimestamp The maximum timestamp to seek from + * @param last The value to start seeking from. Must be >= this value + */ + public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, + final SearchByEdgeType.Order order, final Optional<Edge> last, + final boolean filterMarked ) { - ValidationUtils.verifyIdentity(sourceNode); - ValidationUtils.verifyIdentity(targetNode); - ValidationUtils.verifyString(type, "type"); - GraphValidation.validateTimestamp(maxTimestamp, "maxTimestamp"); - Preconditions.checkNotNull(order, "order must not be null"); - Preconditions.checkNotNull(last, "last can never be null"); + + ValidationUtils.verifyIdentity( sourceNode ); + ValidationUtils.verifyIdentity( targetNode ); + ValidationUtils.verifyString( type, "type" ); + GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" ); + Preconditions.checkNotNull( order, "order must not be null" ); + Preconditions.checkNotNull( last, "last can never be null" ); this.sourceNode = sourceNode; @@ -69,6 +88,7 @@ public class SimpleSearchByEdge implements SearchByEdge { this.maxTimestamp = maxTimestamp; this.order = order; this.last = last; + this.filterMarked = filterMarked; } @@ -97,6 +117,10 @@ public class SimpleSearchByEdge implements SearchByEdge { @Override + public boolean filterMarked() { return filterMarked; } + + + @Override public Optional<Edge> last() { return last; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java index 1687162..9392dbc 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java @@ -41,6 +41,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ private final long maxTimestamp; private final Optional<Edge> last; private final Order order; + private final boolean filterMarked; /** @@ -55,7 +56,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ * //TODO, make last an optional */ public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last ) { - this(node, type, maxTimestamp, order, Optional.fromNullable(last)); + this(node, type, maxTimestamp, order, Optional.fromNullable(last), true); } @@ -70,7 +71,24 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ * * //TODO, make last an optional */ - public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Optional<Edge> last ) { + public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, + final Optional<Edge> last ) { + this( node, type, maxTimestamp, order, last, true ); + } + + + /** + * Create the search modules + * @param node The node to search from + * @param type The edge type + * @param maxTimestamp The maximum timestamp to return + * @param order The order order. Descending is most efficient + * @param last The value to start seeking from. Must be >= this value + * @param filterMarked + */ + public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, + final Optional<Edge> last, final boolean filterMarked ) { + Preconditions.checkNotNull( order, "order is required"); ValidationUtils.verifyIdentity( node ); @@ -84,6 +102,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ this.maxTimestamp = maxTimestamp; this.order = order; this.last = last; + this.filterMarked = filterMarked; } @@ -118,6 +137,12 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ @Override + public boolean filterMarked() { + return filterMarked; + } + + + @Override public boolean equals( final Object o ) { if ( this == o ) { return true;
