Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-907 [created] 9bebb069b
Adds a simple 30 second flush interval task Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9bebb069 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9bebb069 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9bebb069 Branch: refs/heads/USERGRID-907 Commit: 9bebb069bd7fef26dcc81205c0384588593df064 Parents: 566046b Author: Todd Nine <tn...@apigee.com> Authored: Tue Aug 11 16:08:30 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Aug 11 16:08:30 2015 -0600 ---------------------------------------------------------------------- .../apache/usergrid/count/AbstractBatcher.java | 70 +++++++++++++++----- .../main/resources/usergrid-core-context.xml | 1 + .../apache/usergrid/persistence/CounterIT.java | 50 ++++++++++++++ 3 files changed, 106 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java index cd2d2e9..e7dd439 100644 --- a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java +++ b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java @@ -21,12 +21,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.usergrid.count.common.Count; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; @@ -42,17 +48,31 @@ import com.yammer.metrics.core.TimerContext; public abstract class AbstractBatcher implements Batcher { protected BatchSubmitter batchSubmitter; + protected static final Logger logger = LoggerFactory.getLogger( AbstractBatcher.class ); + private volatile Batch batch; private final AtomicLong opCount = new AtomicLong(); private final Timer addTimer = Metrics.newTimer( AbstractBatcher.class, "add_invocation", TimeUnit.MICROSECONDS, TimeUnit.SECONDS ); protected final Counter invocationCounter = Metrics.newCounter( AbstractBatcher.class, "batch_add_invocations" ); - private final Counter existingCounterHit = Metrics.newCounter( AbstractBatcher.class, "counter_existed" ); // TODO add batchCount, remove shouldSubmit, impl submit, change simpleBatcher to just be an extension protected int batchSize = 500; + protected int batchIntervalSeconds = 10; private final AtomicLong batchSubmissionCount = new AtomicLong(); - private final AtomicBoolean lock = new AtomicBoolean( false ); + /** + * Create our scheduler to fire our execution + */ + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 ); + + + /** + * Set the batch interval in seconds + * @param batchIntervalSeconds + */ + public void setBatchInterval(int batchIntervalSeconds){ + this.batchIntervalSeconds = batchIntervalSeconds; + } public void setBatchSize( int batchSize ) { this.batchSize = batchSize; @@ -88,16 +108,23 @@ public abstract class AbstractBatcher implements Batcher { } - Batch getBatch() { + private Batch getBatch() { Batch active = batch; if ( active == null ) { synchronized ( this ) { active = batch; if ( active == null ) { batch = active = new Batch(); + + //now schedule our task for execution since we're creating a batch + scheduler.scheduleWithFixedDelay( new BatchFlusher(), this.batchIntervalSeconds, + this.batchIntervalSeconds, TimeUnit.SECONDS ); + } } } + + //we want to flush, and we have no capacity left, perform a flush if ( batchSize > 1 && active.getCapacity() == 0 ) { synchronized ( this ) { if ( active.getCapacity() == 0 ) { @@ -105,9 +132,29 @@ public abstract class AbstractBatcher implements Batcher { } } } + return active; } + private void flush(){ + synchronized(this) { + getBatch().flush(); + } + } + + + /** + * Runnable that will flush the batch every 30 seconds + */ + private final class BatchFlusher implements Runnable { + + @Override + public void run() { + //explicitly flush the batch + AbstractBatcher.this.flush(); + } + } + public long getBatchSubmissionCount() { return batchSubmissionCount.get(); @@ -118,11 +165,9 @@ public abstract class AbstractBatcher implements Batcher { private BlockingQueue<Count> counts; private final AtomicInteger localCallCounter = new AtomicInteger(); - private final AtomicBoolean lock = new AtomicBoolean( false ); - Batch() { - counts = new ArrayBlockingQueue<Count>( batchSize ); + counts = new ArrayBlockingQueue<>( batchSize ); } @@ -131,6 +176,8 @@ public abstract class AbstractBatcher implements Batcher { } + + void flush() { ArrayList<Count> flushed = new ArrayList<Count>( batchSize ); counts.drainTo( flushed ); @@ -146,7 +193,7 @@ public abstract class AbstractBatcher implements Batcher { counts.offer( count, 500, TimeUnit.MILLISECONDS ); } catch ( Exception ex ) { - ex.printStackTrace(); + logger.error( "Unable to add count, dropping count {}", count, ex ); } } @@ -157,7 +204,7 @@ public abstract class AbstractBatcher implements Batcher { f.get(); } catch ( Exception ex ) { - ex.printStackTrace(); + logger.error( "Unable to add count, dropping count {}", count, ex ); } batchSubmissionCount.incrementAndGet(); opCount.incrementAndGet(); @@ -165,12 +212,5 @@ public abstract class AbstractBatcher implements Batcher { } - /** - * The number of times the {@link #add(org.apache.usergrid.count.common.Count)} method has been invoked on this batch - * instance - */ - public int getLocalCallCount() { - return localCallCounter.get(); - } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/main/resources/usergrid-core-context.xml ---------------------------------------------------------------------- diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml index 53ae4e2..f3eb482 100644 --- a/stack/core/src/main/resources/usergrid-core-context.xml +++ b/stack/core/src/main/resources/usergrid-core-context.xml @@ -134,6 +134,7 @@ <bean id="simpleBatcher" class="org.apache.usergrid.count.SimpleBatcher"> <property name="batchSubmitter" ref="batchSubmitter"/> + <property name="batchInterval" value="${usergrid.counter.batch.interval}"/> <property name="batchSize" value="${usergrid.counter.batch.size}"/> </bean> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java index a394a79..2c72d26 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java @@ -263,4 +263,54 @@ public class CounterIT extends AbstractCoreIT { assertEquals( 1, r.getCounters().get( 0 ).getValues().get( 0 ).getValue() - originalCount ); } + + + + + @Test + public void testTimedFlush() throws Exception { + LOG.info( "CounterIT.testCounters" ); + + EntityManager em = app.getEntityManager(); + + + assertNotNull( em ); + + + UUID user1 = UUID.randomUUID(); + UUID user2 = UUID.randomUUID(); + // UUID groupId = UUID.randomUUID(); + + + Event event = null; + + for ( int i = 0; i < 100; i++ ) { + event = new Event(); + event.setTimestamp( ts + ( i * 60 * 1000 ) ); + event.addCounter( "visits", 1 ); + event.setUser( user1 ); + em.create( event ); + + event = new Event(); + event.setTimestamp( ts + ( i * 60 * 1000 ) ); + event.addCounter( "visits", 1 ); + event.setUser( user2 ); + em.create( event ); + } + + //sleep to ensure the flush has executed + Thread.sleep( 30000 ); + + Results r = em.getAggregateCounters( null, null, null, "visits", CounterResolution.SIX_HOUR, ts, System.currentTimeMillis(), false ); + + final AggregateCounterSet counter = r.getCounters().get( 0 ); + + final long count = counter.getValues().get( 0 ).getValue(); + + final String name = counter.getName(); + + assertEquals("visits", name); + assertEquals(count, 200); + + } }