Removes approximation logic on shards to ensure that we're auditing the actual shard value.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/591a2f1f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/591a2f1f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/591a2f1f Branch: refs/heads/2.1-release Commit: 591a2f1fd97b7259cf72d10855f2832b8342b5a4 Parents: 9c54de3 Author: Todd Nine <[email protected]> Authored: Thu Nov 5 14:37:39 2015 -0700 Committer: Todd Nine <[email protected]> Committed: Thu Nov 5 14:40:08 2015 -0700 ---------------------------------------------------------------------- .../persistence/graph/guice/GraphModule.java | 7 - .../impl/shard/EdgeShardStrategy.java | 10 +- .../impl/shard/NodeShardApproximation.java | 66 -- .../serialization/impl/shard/count/Counter.java | 131 ---- .../shard/count/NodeShardApproximationImpl.java | 272 -------- .../count/NodeShardCounterSerialization.java | 48 -- .../NodeShardCounterSerializationImpl.java | 186 ------ .../impl/shard/count/ShardKey.java | 75 --- .../shard/impl/NodeShardAllocationImpl.java | 19 +- .../impl/shard/impl/ShardGroupDeletionImpl.java | 7 + .../impl/ShardedEdgeSerializationImpl.java | 28 - .../shard/impl/SizebasedEdgeShardStrategy.java | 13 +- .../graph/GraphManagerShardingIT.java | 208 ------ .../impl/shard/NodeShardAllocationTest.java | 48 +- .../shard/count/NodeShardApproximationTest.java | 627 ------------------- .../NodeShardCounterSerializationTest.java | 124 ---- .../shard/impl/ShardGroupDeletionImplTest.java | 3 + 17 files changed, 25 insertions(+), 1847 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java index d2476eb..cf0ffcb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java @@ -59,14 +59,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy; import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation; import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl; @@ -114,9 +110,7 @@ public abstract class GraphModule extends AbstractModule { */ bind(NodeShardAllocation.class).to( NodeShardAllocationImpl.class ); - bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class ); bind( NodeShardCache.class ).to( NodeShardCacheImpl.class ); - bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class ); /** * Binding for task tracker @@ -182,7 +176,6 @@ public abstract class GraphModule extends AbstractModule { migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) ); migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) ); - migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) ); //Get the old version and the new one migrationBinding.addBinding().to( Key.get( EdgeMetadataSerializationV1Impl.class) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java index 1e02a72..803e31e 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java @@ -44,15 +44,7 @@ public interface EdgeShardStrategy { */ public Iterator<ShardEntryGroup> getReadShards(final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta ); - /** - * Increment our count meta data by the passed value. Can be a positive or a negative number. - * @param scope The scope in the application - * @param shard The shard to use - * @param count The amount to increment or decrement - * @param directedEdgeMeta The edge meta data to use - * @return - */ - public void increment(final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta ); + http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java deleted file mode 100644 index fc39e56..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.graph.serialization.impl.shard; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - - -/** - * Interface for creating approximate estimates of shards - */ -public interface NodeShardApproximation { - - - /** - * Increment the shard Id the specified amount - * - * @param scope The scope - * @param shard The shard to use - * @param count The count to increment - * @param directedEdgeMeta The directed edge meta data to use - */ - public void increment( final ApplicationScope scope, final Shard shard, - final long count, final DirectedEdgeMeta directedEdgeMeta ); - - - /** - * Get the approximation of the number of unique items - * - * @param scope The scope - * @param directedEdgeMeta The directed edge meta data to use - */ - public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ); - - - /** - * Flush the current counters in the Approximation. Will return immediately after the flush. You can then use flushPending - * to check the state. - */ - public void beginFlush(); - - /** - * Return true if there is data to be flushed - * @return - */ - public boolean flushPending(); - - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java deleted file mode 100644 index f5666a2..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.graph.serialization.impl.shard.count; - - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.base.Preconditions; - - -/** - * This class is synchronized for addition. It is meant to be used across multiple threads - */ -public class Counter { - /** - * The counter to tell us how often it was invoked - */ - private final AtomicLong invokeCounter; - - /** - * Pointer to our "current" counter map. We beginFlush this when time expires or we hit our count - */ - private final ConcurrentHashMap<ShardKey, AtomicLong> counts; - - /** - * The timestamp the concurrent map was created - */ - private final long createTimestamp; - - - /** - * Implementation of the internal counters - */ - public Counter() { - this.createTimestamp = System.currentTimeMillis(); - this.invokeCounter = new AtomicLong(); - this.counts = new ConcurrentHashMap<>(); - } - - - /** - * Add the count to the key. - */ - public void add( final ShardKey key, final long count ) { - AtomicLong counter = counts.get( key ); - - if ( counter == null ) { - counter = new AtomicLong(); - AtomicLong existingCounter = counts.putIfAbsent( key, counter ); - - if ( existingCounter != null ) { - counter = existingCounter; - } - } - - counter.addAndGet( count ); - invokeCounter.incrementAndGet(); - } - - - /** - * Get the current valye from the cache - */ - public long get( final ShardKey key ) { - AtomicLong counter = counts.get( key ); - - if ( counter == null ) { - return 0; - } - - return counter.get(); - } - - - /** - * Deep copy the counts from other into this counter - * @param other - */ - public void merge(final Counter other){ - - Preconditions.checkNotNull(other, "other cannot be null"); - Preconditions.checkNotNull( other.counts, "other.counts cannot be null" ); - - for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){ - add(entry.getKey(), entry.getValue().get()); - } - } - - - /** - * Get all entries - * @return - */ - public Set<Map.Entry<ShardKey, AtomicLong>> getEntries(){ - return counts.entrySet(); - } - - - /** - * Get the count of the number of times we've been incremented - * @return - */ - public long getInvokeCount() { - return invokeCounter.get(); - } - - - - public long getCreateTimestamp() { - return createTimestamp; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java deleted file mode 100644 index fceb32c..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.graph.serialization.impl.shard.count; - - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.inject.Inject; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.consistency.TimeService; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.GraphFig; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; - -import com.netflix.astyanax.MutationBatch; -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import com.netflix.hystrix.HystrixThreadPoolProperties; - -import rx.functions.Action0; -import rx.schedulers.Schedulers; - - -/** - * Implementation for doing edge approximation based on counters. Uses a guava loading cache to load values from - * cassandra, and beginFlush them on cache eviction. - */ -public class NodeShardApproximationImpl implements NodeShardApproximation { - - private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class); - - /** - * Read write locks to ensure we atomically swap correctly - */ - private final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); - private final ReentrantReadWriteLock.WriteLock writeLockLock = reentrantReadWriteLock.writeLock(); - - private final GraphFig graphFig; - private final NodeShardCounterSerialization nodeShardCounterSerialization; - private final TimeService timeService; - - /** - * Counter currently implemented - */ - private volatile Counter currentCounter; - - /** - * The counter that is currently in process of flushing to Cassandra. Can be null - */ - private final BlockingQueue<Counter> flushQueue; - - private final FlushWorker worker; - - /** - * Command group used for realtime user commands - */ - public static final HystrixCommand.Setter - COUNT_GROUP = HystrixCommand.Setter.withGroupKey( - HystrixCommandGroupKey.Factory.asKey( "BatchCounterRollup" ) ).andThreadPoolPropertiesDefaults( - HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) ); - - - /** - * Create a time shard approximation with the correct configuration. - */ - @Inject - public NodeShardApproximationImpl( final GraphFig graphFig, - final NodeShardCounterSerialization nodeShardCounterSerialization, - final TimeService timeService ) { - this.graphFig = graphFig; - this.nodeShardCounterSerialization = nodeShardCounterSerialization; - this.timeService = timeService; - this.currentCounter = new Counter(); - this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() ); - - this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization ); - - Schedulers.newThread().createWorker().schedule( worker ); - - } - - - @Override - public void increment( - final ApplicationScope scope, final Shard shard, - final long count, final DirectedEdgeMeta directedEdgeMeta ) { - - - final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta ); - - readLock.lock(); - - - try { - currentCounter.add( key, count ); - } - finally { - readLock.unlock(); - } - - checkFlush(); - } - - - @Override - public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) { - - final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta ); - - - readLock.lock(); - - long count; - - try { - count = currentCounter.get( key ); - - } - finally { - readLock.unlock(); - } - - - //read from Cassandra and add to get a "close enough" number - return count + nodeShardCounterSerialization.getCount( key ); - } - - - @Override - public void beginFlush() { - - writeLockLock.lock(); - - try { - - final boolean queued = flushQueue.offer( currentCounter ); - - /** - * We were able to q the beginFlush, swap it - */ - if ( queued ) { - currentCounter = new Counter(); - } - } - finally { - writeLockLock.unlock(); - } - } - - - @Override - public boolean flushPending() { - return flushQueue.size() > 0 || worker.isFlushing(); - } - - - /** - * Check if we need to beginFlush. If we do, perform the beginFlush - */ - private void checkFlush() { - - //there's no beginFlush pending and we're past the timeout or count - if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime() - || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) { - beginFlush(); - } - } - - - /** - * Worker that will take from the queue - */ - private static class FlushWorker implements Action0 { - - private final BlockingQueue<Counter> counterQueue; - private final NodeShardCounterSerialization nodeShardCounterSerialization; - - private volatile Counter rollUp; - - - private FlushWorker( final BlockingQueue<Counter> counterQueue, - final NodeShardCounterSerialization nodeShardCounterSerialization ) { - this.counterQueue = counterQueue; - this.nodeShardCounterSerialization = nodeShardCounterSerialization; - } - - - @Override - public void call() { - - - while ( true ) { - /** - * Block taking the first element. Once we take this, batch drain and roll up the rest - */ - - try { - rollUp = null; - rollUp = counterQueue.take(); - } - catch ( InterruptedException e ) { - LOG.error( "Unable to read from counter queue", e ); - throw new RuntimeException( "Unable to read from counter queue", e ); - - } - - - - - //copy to the batch outside of the command for performance - final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp ); - - /** - * Execute the command in hystrix to avoid slamming cassandra - */ - new HystrixCommand( COUNT_GROUP ) { - - @Override - protected Void run() throws Exception { - batch.execute(); - - return null; - } - - - @Override - protected Object getFallback() { - //we've failed to mutate. Merge this count back into the current one - counterQueue.offer( rollUp ); - - return null; - } - }.execute(); - } - - } - - - /** - * Return true if we're in the process of flushing - * @return - */ - public boolean isFlushing(){ - return rollUp != null; - } - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java deleted file mode 100644 index aafbd26..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.graph.serialization.impl.shard.count; - - -import org.apache.usergrid.persistence.core.migration.schema.Migration; - -import com.netflix.astyanax.MutationBatch; - - -/** - * Serialization for flushing and reading counters - */ -public interface NodeShardCounterSerialization extends Migration { - - - /** - * Flush the counter to the mutation batch - * @param counter - * @return - */ - public MutationBatch flush(Counter counter); - - - /** - * Get the count of this shard, if it exists. - * @param key The shard key to get - * @return - */ - public long getCount(ShardKey key); - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java deleted file mode 100644 index 6934275..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.graph.serialization.impl.shard.count; - - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.CounterColumnType; - -import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; -import org.apache.usergrid.persistence.core.astyanax.ColumnTypes; -import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; -import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; -import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.graph.GraphFig; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.OperationResult; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.connectionpool.exceptions.NotFoundException; -import com.netflix.astyanax.model.Column; -import com.netflix.astyanax.model.CompositeBuilder; -import com.netflix.astyanax.model.CompositeParser; -import com.netflix.astyanax.serializers.BooleanSerializer; - - -@Singleton -public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization { - - - private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer(); - - /** - * Edge shards - */ - private static final MultiTennantColumnFamily<ScopedRowKey<ShardKey>, Boolean> EDGE_SHARD_COUNTS = - new MultiTennantColumnFamily<>( "Edge_Shard_Counts", - new ScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() ); - - - protected final Keyspace keyspace; - protected final CassandraConfig cassandraConfig; - protected final GraphFig graphFig; - - - @Inject - public NodeShardCounterSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig, - final GraphFig graphFig ) { - this.keyspace = keyspace; - this.cassandraConfig = cassandraConfig; - this.graphFig = graphFig; - } - - - @Override - public MutationBatch flush( final Counter counter ) { - - - Preconditions.checkNotNull( counter, "counter must be specified" ); - - - final MutationBatch batch = keyspace.prepareMutationBatch(); - - for ( Map.Entry<ShardKey, AtomicLong> entry : counter.getEntries() ) { - - final ShardKey key = entry.getKey(); - final long value = entry.getValue().get(); - - - final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key ); - - - batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value ); - } - - - return batch; - } - - - @Override - public long getCount( final ShardKey key ) { - - final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key ); - - - OperationResult<Column<Boolean>> column = null; - try { - column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute(); - } - //column not found, return 0 - catch ( NotFoundException nfe ) { - return 0; - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to read from cassandra", e ); - } - - return column.getResult().getLongValue(); - } - - - @Override - public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() { - return Collections.singleton( - new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(), - ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(), - MultiTennantColumnFamilyDefinition.CacheOption.ALL ) ); - } - - - - private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> { - - - private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); - - private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE; - - - @Override - public void toComposite( final CompositeBuilder builder, final ShardKey key ) { - - ID_SER.toComposite( builder, key.scope.getApplication() ); - - EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta ); - - builder.addLong( key.shard.getShardIndex() ); - - builder.addLong( key.shard.getCreatedTime() ); - } - - - @Override - public ShardKey fromComposite( final CompositeParser composite ) { - - final Id applicationId = ID_SER.fromComposite( composite ); - - final ApplicationScope scope = new ApplicationScopeImpl( applicationId ); - - final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite ); - - final long shardIndex = composite.readLong(); - - final long shardCreatedTime = composite.readLong(); - - return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta ); - } - - - } - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java deleted file mode 100644 index c976210..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.graph.serialization.impl.shard.count; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; - - -/** - * Key for shards and counts - */ -public class ShardKey { - public final ApplicationScope scope; - public final Shard shard; - public final DirectedEdgeMeta directedEdgeMeta; - - - public ShardKey( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) { - this.scope = scope; - this.shard = shard; - this.directedEdgeMeta = directedEdgeMeta; - } - - - @Override - public boolean equals( final Object o ) { - if ( this == o ) { - return true; - } - if ( o == null || getClass() != o.getClass() ) { - return false; - } - - final ShardKey shardKey = ( ShardKey ) o; - - if ( !directedEdgeMeta.equals( shardKey.directedEdgeMeta ) ) { - return false; - } - if ( !scope.equals( shardKey.scope ) ) { - return false; - } - if ( !shard.equals( shardKey.shard ) ) { - return false; - } - - return true; - } - - - @Override - public int hashCode() { - int result = scope.hashCode(); - result = 31 * result + shard.hashCode(); - result = 31 * result + directedEdgeMeta.hashCode(); - return result; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index b0875af..8943737 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEd import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation; import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction; @@ -63,7 +62,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { private final EdgeShardSerialization edgeShardSerialization; private final EdgeColumnFamilies edgeColumnFamilies; private final ShardedEdgeSerialization shardedEdgeSerialization; - private final NodeShardApproximation nodeShardApproximation; private final TimeService timeService; private final GraphFig graphFig; private final ShardGroupCompaction shardGroupCompaction; @@ -72,13 +70,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { @Inject public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization, final EdgeColumnFamilies edgeColumnFamilies, - final ShardedEdgeSerialization shardedEdgeSerialization, - final NodeShardApproximation nodeShardApproximation, final TimeService timeService, + final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService, final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) { this.edgeShardSerialization = edgeShardSerialization; this.edgeColumnFamilies = edgeColumnFamilies; this.shardedEdgeSerialization = shardedEdgeSerialization; - this.nodeShardApproximation = nodeShardApproximation; this.timeService = timeService; this.graphFig = graphFig; this.shardGroupCompaction = shardGroupCompaction; @@ -166,18 +162,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { * Check out if we have a count for our shard allocation */ - final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta ); + final long shardSize = graphFig.getShardSize(); - if ( count < shardSize ) { - return false; - } - - if ( LOG.isDebugEnabled() ) { - LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize ); - } /** * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a @@ -234,10 +223,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { /** - * Sanity check in case our counters become severely out of sync with our edge state in cassandra. + * Sanity check in case we audit before we have a full shard */ if ( marked == null ) { - LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup ); + LOG.info( "Shard {} in shard group {} not full, not splitting", shardEntryGroup ); return false; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java index 6d2a009..ea10ed5 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java @@ -167,6 +167,13 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion { continue; } + //The shard is not compacted, we cannot remove it. This should never happen, a bit of an "oh shit" scenario. + //the isCompactionPending should return false in this case + if(!shard.isCompacted()){ + logger.warn( "Shard {} in group {} is not compacted yet was checked. Short circuiting", shard, shardEntryGroup ); + return DeleteResult.NO_OP; + } + final MutationBatch shardRemovalMutation = edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index 2ef50f5..1060495 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -123,10 +123,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final Shard shard, final boolean isDeleted ) { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted ); - - if ( !isDeleted ) { - writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta ); - } } }.createBatch( scope, shards, timestamp ); } @@ -153,11 +149,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted ); - - - if ( !isDeleted ) { - writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta ); - } } }.createBatch( scope, shards, timestamp ); } @@ -182,10 +173,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted ); - - if ( !isDeleted ) { - writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta ); - } } }.createBatch( scope, shards, timestamp ); } @@ -212,11 +199,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ) .putColumn( edge, isDeleted ); - - - if ( !isDeleted ) { - writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta ); - } } }.createBatch( scope, shards, timestamp ); } @@ -241,11 +223,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final boolean isDeleted ) { batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ) .putColumn( column, isDeleted ); - - - if ( !isDeleted ) { - writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta ); - } } }.createBatch( scope, shards, timestamp ); } @@ -265,7 +242,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final Shard shard, final boolean isDeleted ) { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge ); - writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -288,7 +264,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ) .deleteColumn( edge ); - writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -308,7 +283,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final Shard shard, final boolean isDeleted ) { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge ); - writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -331,7 +305,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ) .deleteColumn( edge ); - writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -351,7 +324,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final boolean isDeleted ) { batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ) .deleteColumn( column ); - writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java index 8787d97..cf164ba 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java @@ -25,9 +25,7 @@ import java.util.Iterator; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation; import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup; import com.google.inject.Inject; @@ -42,14 +40,11 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy { private final NodeShardCache shardCache; - private final NodeShardApproximation shardApproximation; @Inject - public SizebasedEdgeShardStrategy( final NodeShardCache shardCache, - final NodeShardApproximation shardApproximation ) { + public SizebasedEdgeShardStrategy( final NodeShardCache shardCache) { this.shardCache = shardCache; - this.shardApproximation = shardApproximation; } @@ -65,10 +60,4 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy { return shardCache.getReadShardGroup( scope, maxTimestamp, directedEdgeMeta ); } - - @Override - public void increment( final ApplicationScope scope, final Shard shard, - final long count, final DirectedEdgeMeta directedEdgeMeta) { - shardApproximation.increment( scope, shard, count, directedEdgeMeta ); - } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java deleted file mode 100644 index 2951efe..0000000 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. - * - */ -package org.apache.usergrid.persistence.graph; - - -import java.util.concurrent.TimeoutException; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.core.test.ITRunner; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.core.util.IdGenerator; -import org.apache.usergrid.persistence.graph.guice.TestGraphModule; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; - -import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge; -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - - -@RunWith( ITRunner.class ) -@UseModules( TestGraphModule.class ) -@Ignore("Kills cassandra") -public class GraphManagerShardingIT { - - - @Inject - @Rule - public MigrationManagerRule migrationManagerRule; - - - @Inject - protected GraphManagerFactory emf; - - - @Inject - protected GraphFig graphFig; - - @Inject - protected NodeShardApproximation nodeShardApproximation; - - protected ApplicationScope scope; - - - - - @Before - public void mockApp() { - this.scope = new ApplicationScopeImpl( IdGenerator.createId( "application" ) ); - } - - - @Test - public void testWriteSourceType() throws TimeoutException, InterruptedException { - - GraphManager gm = emf.createEdgeManager( scope ) ; - - final Id sourceId = IdGenerator.createId( "source" ); - final String edgeType = "test"; - - - - - final long flushCount = graphFig.getCounterFlushCount(); - final long maxShardSize = graphFig.getShardSize(); - - - - - final long startTime = System.currentTimeMillis(); - - //each edge causes 4 counts - final long writeCount = flushCount/4; - - assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount ); - - Id targetId = null; - - for(long i = 0; i < writeCount; i ++){ - targetId = IdGenerator.createId( "target" ) ; - - final Edge edge = createEdge( sourceId, edgeType, targetId); - - gm.writeEdge( edge ).toBlocking().last(); - - } - - - - final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, edgeType, - targetId.getType() ); - final Shard shard = new Shard(0, 0, true); - - - long shardCount = nodeShardApproximation.getCount( scope, shard, sourceEdgeMeta ); - - assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount); - - - //now verify it's correct for the target - final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType(targetId, edgeType, sourceId.getType() ); - - - shardCount = nodeShardApproximation.getCount( scope, shard, targetEdgeMeta ); - - assertEquals(1, shardCount); - - } - - - @Test - public void testWriteTargetType() throws TimeoutException, InterruptedException { - - GraphManager gm = emf.createEdgeManager( scope ) ; - - final Id targetId = IdGenerator.createId( "target" ); - final String edgeType = "test"; - - - - - final long flushCount = graphFig.getCounterFlushCount(); - final long maxShardSize = graphFig.getShardSize(); - - - //each edge causes 4 counts - final long writeCount = flushCount/4; - - assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount ); - - Id sourceId = null; - - for(long i = 0; i < writeCount; i ++){ - sourceId = IdGenerator.createId( "source" ) ; - - final Edge edge = createEdge( sourceId, edgeType, targetId); - - gm.writeEdge( edge ).toBlocking().last(); - - } - - - //this is from target->source, since the target id doesn't change - final DirectedEdgeMeta targetMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType ); - final Shard shard = new Shard(0l, 0l, true); - - long targetWithType = nodeShardApproximation.getCount( scope, shard, targetMeta ); - - assertEquals("Shard count for target node should be the same as write count", writeCount, targetWithType); - - - final DirectedEdgeMeta targetNodeSource = DirectedEdgeMeta.fromTargetNodeSourceType( targetId, edgeType, "source" ); - - long shardCount = nodeShardApproximation.getCount( scope, shard, targetNodeSource ); - - assertEquals("Shard count for target node should be the same as write count", writeCount, shardCount); - - - //now verify it's correct for the target - - final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType ); - - shardCount = nodeShardApproximation.getCount( scope, shard, sourceMeta ); - - assertEquals(1, shardCount); - - } - - - - -} - - - - - http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java index ac965cd..bc364cc 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java @@ -98,15 +98,13 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class ); - final TimeService timeService = mock( TimeService.class ); NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final long timeservicetime = System.currentTimeMillis(); @@ -131,15 +129,12 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class ); - - final TimeService timeService = mock( TimeService.class ); NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -175,14 +170,11 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); - - final TimeService timeService = mock( TimeService.class ); NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -205,8 +197,6 @@ public class NodeShardAllocationTest { final long count = graphFig.getShardSize() - 1; - when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( count ); - final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta ); assertFalse( "Shard allocated", result ); @@ -223,14 +213,12 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); - final TimeService timeService = mock( TimeService.class ); NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -255,9 +243,6 @@ public class NodeShardAllocationTest { final long shardCount = ( long ) ( graphFig.getShardSize() * 2.5 ); - //return a shard size equal to our max - when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount ); - //this is how many we should be iterating and should set the value of the last shard we keep final int numToIterate = ( int ) ( graphFig.getShardSize() * 2 ); @@ -338,14 +323,12 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); - final TimeService timeService = mock( TimeService.class ); NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -378,9 +361,6 @@ public class NodeShardAllocationTest { iteratedEdges.add( returnedEdge ); - //return a shard size equal to our max - when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount ); - ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class ); @@ -429,14 +409,12 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); - final TimeService timeService = mock( TimeService.class ); NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -456,8 +434,6 @@ public class NodeShardAllocationTest { final long shardCount = graphFig.getShardSize(); - //return a shard size equal to our max - when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount ); ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class ); @@ -495,7 +471,6 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); final TimeService timeService = mock( TimeService.class ); @@ -503,7 +478,7 @@ public class NodeShardAllocationTest { NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -630,9 +605,6 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); - - final TimeService timeService = mock( TimeService.class ); final long returnTime = System.currentTimeMillis() + graphFig.getShardCacheTimeout() * 2; @@ -643,7 +615,7 @@ public class NodeShardAllocationTest { NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -712,8 +684,6 @@ public class NodeShardAllocationTest { final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class ); - final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class ); - /** * Return 100000 milliseconds @@ -736,7 +706,7 @@ public class NodeShardAllocationTest { NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - nodeShardApproximation, timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction ); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java deleted file mode 100644 index 32a0cda..0000000 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.graph.serialization.impl.shard.count; - - -import java.beans.PropertyChangeListener; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.safehaus.guicyfig.Bypass; -import org.safehaus.guicyfig.OptionState; -import org.safehaus.guicyfig.Overrides; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; -import org.apache.usergrid.persistence.core.consistency.TimeService; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.util.IdGenerator; -import org.apache.usergrid.persistence.graph.GraphFig; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; - -import com.google.common.util.concurrent.ListenableFuture; -import com.netflix.astyanax.ColumnListMutation; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.WriteAheadLog; -import com.netflix.astyanax.connectionpool.Host; -import com.netflix.astyanax.connectionpool.OperationResult; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.model.ColumnFamily; -import com.netflix.astyanax.model.ConsistencyLevel; -import com.netflix.astyanax.retry.RetryPolicy; - -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class NodeShardApproximationTest { - - private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class ); - - private GraphFig graphFig; - - private NodeShardCounterSerialization nodeShardCounterSerialization; - private TimeService timeService; - - protected ApplicationScope scope; - - - @Before - public void setup() { - scope = mock( ApplicationScope.class ); - - Id orgId = mock( Id.class ); - - when( orgId.getType() ).thenReturn( "organization" ); - when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() ); - - when( scope.getApplication() ).thenReturn( orgId ); - - graphFig = mock( GraphFig.class ); - - when( graphFig.getShardCacheSize() ).thenReturn( 10000l ); - when( graphFig.getShardSize() ).thenReturn( 250000l ); - when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 ); - - nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class ); - - when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) ); - - - timeService = mock( TimeService.class ); - - when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() ); - } - - - @Test - public void testSingleShard() throws InterruptedException { - - - when( graphFig.getCounterFlushCount() ).thenReturn( 100000l ); - NodeShardApproximation approximation = - new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService ); - - - final Id id = IdGenerator.createId( "test" ); - final Shard shard = new Shard( 0, 0, true ); - final String type = "type"; - final String type2 = "subType"; - - final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 ); - - long count = approximation.getCount( scope, shard, directedEdgeMeta ); - - waitForFlush( approximation ); - - assertEquals( 0, count ); - } - - - @Ignore("outdated and no longer relevant test") - @Test - public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException { - - - NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization(); - - final NodeShardApproximation approximation = - new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() ); - - - final int increments = 1000000; - final int workers = Runtime.getRuntime().availableProcessors() * 2; - - final Id id = IdGenerator.createId( "test" ); - final String type = "type"; - final String type2 = "subType"; - - final Shard shard = new Shard( 10000, 0, true ); - - final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 ); - - ExecutorService executor = Executors.newFixedThreadPool( workers ); - - List<Future<Long>> futures = new ArrayList<>( workers ); - - for ( int i = 0; i < workers; i++ ) { - - final Future<Long> future = executor.submit( new Callable<Long>() { - @Override - public Long call() throws Exception { - - for ( int i = 0; i < increments; i++ ) { - approximation.increment( scope, shard, 1, directedEdgeMeta ); - } - - return 0l; - } - } ); - - futures.add( future ); - } - - - for ( Future<Long> future : futures ) { - future.get(); - } - - waitForFlush( approximation ); - //get our count. It should be accurate b/c we only have 1 instance - - final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta ); - final long expected = workers * increments; - - - assertEquals( expected, returnedCount ); - - //test we get nothing with the other type - - final long emptyCount = - approximation.getCount( scope, shard, DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ) ); - - - assertEquals( 0, emptyCount ); - } - - - @Ignore("outdated and no longer relevant test") - @Test - public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException { - - - NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization(); - - final NodeShardApproximation approximation = - new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() ); - - - final int increments = 1000000; - final int workers = Runtime.getRuntime().availableProcessors() * 2; - - final Id id = IdGenerator.createId( "test" ); - final String type = "type"; - final String type2 = "subType"; - - final AtomicLong shardIdCounter = new AtomicLong(); - - - final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 ); - - - ExecutorService executor = Executors.newFixedThreadPool( workers ); - - List<Future<Shard>> futures = new ArrayList<>( workers ); - - for ( int i = 0; i < workers; i++ ) { - - final Future<Shard> future = executor.submit( new Callable<Shard>() { - @Override - public Shard call() throws Exception { - - final long threadShardId = shardIdCounter.incrementAndGet(); - - final Shard shard = new Shard( threadShardId, 0, true ); - - for ( int i = 0; i < increments; i++ ) { - approximation.increment( scope, shard, 1, directedEdgeMeta ); - } - - return shard; - } - } ); - - futures.add( future ); - } - - - for ( Future<Shard> future : futures ) { - final Shard shardId = future.get(); - - waitForFlush( approximation ); - - final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta ); - - assertEquals( increments, returnedCount ); - } - } - - - private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException { - - approximation.beginFlush(); - - while ( approximation.flushPending() ) { - - LOG.info( "Waiting on beginFlush to complete" ); - - Thread.sleep( 100 ); - } - } - - - /** - * These are created b/c we can't use Mockito. It OOM's with keeping track of all the mock invocations - */ - - private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization { - - private Counter copy = new Counter(); - - - @Override - public MutationBatch flush( final Counter counter ) { - copy.merge( counter ); - return new TestMutationBatch(); - } - - - @Override - public long getCount( final ShardKey key ) { - return copy.get( key ); - } - - - @Override - public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - } - - - /** - * Simple test mutation to no-op during tests - */ - private - static class TestMutationBatch implements MutationBatch { - - @Override - public <K, C> ColumnListMutation<C> withRow( final ColumnFamily<K, C> columnFamily, final K rowKey ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public <K> void deleteRow( final Iterable<? extends ColumnFamily<K, ?>> columnFamilies, final K rowKey ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void discardMutations() { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void mergeShallow( final MutationBatch other ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean isEmpty() { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public int getRowCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Map<ByteBuffer, Set<String>> getRowKeys() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch pinToHost( final Host host ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch setConsistencyLevel( final ConsistencyLevel consistencyLevel ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withConsistencyLevel( final ConsistencyLevel consistencyLevel ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withRetryPolicy( final RetryPolicy retry ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch usingWriteAheadLog( final WriteAheadLog manager ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch lockCurrentTimestamp() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch setTimeout( final long timeout ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch setTimestamp( final long timestamp ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withTimestamp( final long timestamp ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withAtomicBatch( final boolean condition ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public ByteBuffer serialize() throws Exception { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void deserialize( final ByteBuffer data ) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public OperationResult<Void> execute() throws ConnectionException { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public ListenableFuture<OperationResult<Void>> executeAsync() throws ConnectionException { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - } - - - private static class TestGraphFig implements GraphFig { - - @Override - public int getScanPageSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public int getRepairConcurrentSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public double getShardRepairChance() { - return 0; - } - - - @Override - public long getShardSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public long getShardCacheTimeout() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public long getShardMinDelta() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public long getShardCacheSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public int getShardCacheRefreshWorkerCount() { - return 0; - } - - - @Override - public int getShardAuditWorkerCount() { - return 0; - } - - - @Override - public int getShardAuditWorkerQueueSize() { - return 0; - } - - - @Override - public long getCounterFlushCount() { - return 100000l; - } - - - @Override - public long getCounterFlushInterval() { - return 30000l; - } - - - @Override - public int getCounterFlushQueueSize() { - return 10000; - } - - - @Override - public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void removePropertyChangeListener( final PropertyChangeListener propertyChangeListener ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public OptionState[] getOptions() { - return new OptionState[0]; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public OptionState getOption( final String s ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public String getKeyByMethod( final String s ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Object getValueByMethod( final String s ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Properties filterOptions( final Properties properties ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Map<String, Object> filterOptions( final Map<String, Object> stringObjectMap ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void override( final String s, final String s2 ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean setOverrides( final Overrides overrides ) { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Overrides getOverrides() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void bypass( final String s, final String s2 ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean setBypass( final Bypass bypass ) { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Bypass getBypass() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Class getFigInterface() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean isSingleton() { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - } - - - private static class TestTimeService implements TimeService { - - @Override - public long getCurrentTime() { - return System.currentTimeMillis(); - } - } -}
