http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java index 185cfb7,3dbf1ec..c4c083f --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java @@@ -23,8 -23,7 +23,10 @@@ import java.util.Collections import java.util.Iterator; import java.util.UUID; -import com.netflix.astyanax.model.ConsistencyLevel; +import com.datastax.driver.core.BatchStatement; ++import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; ++ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@@ -49,7 -47,7 +50,6 @@@ import org.apache.usergrid.persistence. import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.google.inject.Inject; --import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import static org.junit.Assert.assertEquals; @@@ -365,4 -341,232 +365,229 @@@ public abstract class UniqueValueSerial } + /** + * Test that inserting duplicates always show the oldest entity UUID being returned (versions of that OK to change). + * + * @throws ConnectionException + * @throws InterruptedException + */ + @Test + public void testWritingDuplicates() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version2 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version1 ); + + - strategy.write( scope, stored1 ).execute(); - strategy.write( scope, stored2 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored1, -1 )); ++ session.execute(strategy.writeCQL( scope, stored2, -1 )); + + // load descending to get the older version of entity for this unique value - UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, ++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM, + entityId1.getType(), Collections.<Field>singleton( field ), true); + + UniqueValue retrieved = fields.getValue( field.getName() ); + + // validate that the first entity UUID is returned after inserting a duplicate mapping + assertEquals( stored1, retrieved ); + + + + UUID version3 = UUIDGenerator.newTimeUUID(); + UniqueValue stored3 = new UniqueValueImpl( field, entityId2, version3); - strategy.write( scope, stored3 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored3, -1 )); + + // load the values again, we should still only get back the original unique value - fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, ++ fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM, + entityId1.getType(), Collections.<Field>singleton( field ), true); + + retrieved = fields.getValue( field.getName() ); + + // validate that the first entity UUID is still returned after inserting duplicate with newer version + assertEquals( stored1, retrieved ); + + + UUID version4 = UUIDGenerator.newTimeUUID(); + UniqueValue stored4 = new UniqueValueImpl( field, entityId1, version4); - strategy.write( scope, stored4 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored4, -1 )); + + // load the values again, now we should get the latest version of the original UUID written - fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, ++ fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM, + entityId1.getType(), Collections.<Field>singleton( field ), true); + + retrieved = fields.getValue( field.getName() ); + + // validate that the first entity UUID is still returned, but with the latest version + assertEquals( stored4, retrieved ); + + } + + /** + * Test that inserting multiple versions of the same entity UUID result in the latest version being returned. + * + * @throws ConnectionException + * @throws InterruptedException + */ + @Test + public void testMultipleVersionsSameEntity() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId1, version2 ); + + - strategy.write( scope, stored1 ).execute(); - strategy.write( scope, stored2 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored1, -1 )); ++ session.execute(strategy.writeCQL( scope, stored2, -1 )); + + // load descending to get the older version of entity for this unique value - UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, ++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM, + entityId1.getType(), Collections.<Field>singleton( field ), true); + + UniqueValue retrieved = fields.getValue( field.getName() ); + Assert.assertNotNull( retrieved ); + assertEquals( stored2, retrieved ); + + + } + + @Test + public void testDuplicateEntitiesDescending() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + UUID version3 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId3, version1 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 ); + UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 ); + + - strategy.write( scope, stored1 ).execute(); - strategy.write( scope, stored2 ).execute(); - strategy.write( scope, stored3 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored1, -1 )); ++ session.execute(strategy.writeCQL( scope, stored2, -1 )); ++ session.execute(strategy.writeCQL( scope, stored3, -1 )); + + + // load descending to get the older version of entity for this unique value - UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, ++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM, + entityId1.getType(), Collections.<Field>singleton( field ), true); + + - fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, - entityId1.getType(), Collections.<Field>singleton( field ), false); - + UniqueValue retrieved = fields.getValue( field.getName() ); + assertEquals( stored3, retrieved ); + + + } + + @Test + public void testDuplicateEntitiesAscending() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + UUID version3 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 ); + UniqueValue stored3 = new UniqueValueImpl( field, entityId3, version3 ); + + - strategy.write( scope, stored1 ).execute(); - strategy.write( scope, stored2 ).execute(); - strategy.write( scope, stored3 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored1, -1 )); ++ session.execute(strategy.writeCQL( scope, stored2, -1 )); ++ session.execute(strategy.writeCQL( scope, stored3, -1 )); + + + // load descending to get the older version of entity for this unique value + UniqueValueSet fields = strategy.load( scope, - ConsistencyLevel.CL_LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true); ++ ConsistencyLevel.LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true); + + UniqueValue retrieved = fields.getValue( field.getName() ); + assertEquals( stored1, retrieved ); + + + } + + @Test + public void testMixedDuplicates() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + UUID version3 = UUIDGenerator.newTimeUUID(); + UUID version4 = UUIDGenerator.newTimeUUID(); + UUID version5 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version5 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version4 ); + UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 ); + UniqueValue stored4 = new UniqueValueImpl( field, entityId3, version2 ); + UniqueValue stored5 = new UniqueValueImpl( field, entityId3, version1 ); + + + - strategy.write( scope, stored1 ).execute(); - strategy.write( scope, stored2 ).execute(); - strategy.write( scope, stored3 ).execute(); - strategy.write( scope, stored4 ).execute(); - strategy.write( scope, stored5 ).execute(); ++ session.execute(strategy.writeCQL( scope, stored1, -1 )); ++ session.execute(strategy.writeCQL( scope, stored2, -1 )); ++ session.execute(strategy.writeCQL( scope, stored3, -1 )); ++ session.execute(strategy.writeCQL( scope, stored4, -1 )); ++ session.execute(strategy.writeCQL( scope, stored5, -1 )); + + + // load descending to get the older version of entity for this unique value - UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM, ++ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM, + entityId1.getType(), Collections.<Field>singleton( field ), true); + + UniqueValue retrieved = fields.getValue( field.getName() ); + assertEquals( stored1, retrieved ); + + + } + }
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties index 92d0041,d404b1e..ebae735 --- a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties +++ b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties @@@ -1,7 -1,26 +1,25 @@@ + # Licensed to the Apache Software Foundation (ASF) under one + # or more contributor license agreements. See the NOTICE file + # distributed with this work for additional information + # regarding copyright ownership. The ASF licenses this file + # to you under the Apache License, Version 2.0 (the + # "License"); you may not use this file except in compliance + # with the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, + # software distributed under the License is distributed on an + # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + # KIND, either express or implied. See the License for the + # specific language governing permissions and limitations + # under the License. + # + # These are for CHOP environment settings -cassandra.connections=20 +cassandra.connections=50 cassandra.port=9160 -cassandra.version=1.2 # a comma delimited private IP address list to your chop cassandra cluster # define this in your settings.xml and have it as an always active profile http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/pom.xml ---------------------------------------------------------------------- diff --cc stack/corepersistence/common/pom.xml index c389a5b,bbcadff..63d339b --- a/stack/corepersistence/common/pom.xml +++ b/stack/corepersistence/common/pom.xml @@@ -57,9 -73,9 +73,13 @@@ <version>${cassandra.version}</version> <exclusions> <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>*</artifactId> + </exclusion> ++ <exclusion> + <artifactId>netty</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java index 3c58dfb,0000000..2996465 mode 100644,000000..100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java @@@ -1,225 -1,0 +1,239 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.core; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + + +/** + * Cassandra configuration interface. + */ +@FigSingleton +public interface CassandraFig extends GuicyFig { + + // cassndra properties used by datastax driver + String READ_CL = "cassandra.readcl"; + String READ_CL_CONSISTENT = "cassandra.readcl.consistent"; + String WRITE_CL = "cassandra.writecl"; + String STRATEGY = "cassandra.strategy"; + String STRATEGY_OPTIONS = "cassandra.strategy.options"; + + // main application cassandra properties + String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl"; + String ASTYANAX_READ_CL = "usergrid.read.cl"; + String ASTYANAX_WRITE_CL = "usergrid.write.cl"; + String SHARD_VALUES = "cassandra.shardvalues"; + String THRIFT_TRANSPORT_SIZE = "cassandra.thrift.transport.frame"; + String USERNAME = "cassandra.username"; + String PASSWORD = "cassandra.password"; + + // locks cassandra properties + String LOCKS_KEYSPACE_NAME = "cassandra.lock.keyspace"; + String LOCKS_KEYSPACE_REPLICATION = "cassandra.lock.keyspace.replication"; + String LOCKS_KEYSPACE_STRATEGY = "cassandra.lock.keyspace.strategy"; + String LOCKS_CL = "cassandra.lock.cl"; + String LOCKS_SHARED_POOL_FLAG = "cassandra.lock.use_shared_pool"; + String LOCKS_CONNECTIONS = "cassandra.lock.connections"; + String LOCKS_EXPIRATION = "cassandra.lock.expiration.milliseconds"; + - - ++ String LOCK_MANAGER_INIT_RETRIES = "cassandra.lock.init.retries"; ++ String LOCK_MANAGER_INIT_INTERVAL = "cassandra.lock.init.interval"; + + // re-usable default values + String DEFAULT_CONNECTION_POOLSIZE = "15"; + String DEFAULT_LOCKS_EXPIRATION = "3600000"; // 1 hour + String DEFAULT_LOCAL_DC = ""; + String DEFAULT_USERNAME = ""; + String DEFAULT_PASSWORD = ""; + + + @Key( "cassandra.hosts" ) + String getHosts(); + + /** + * Valid options are 1.2, 2.0, 2.1 + * + * @return + */ + @Key( "cassandra.version" ) + @Default( "2.1" ) + String getVersion(); + + @Key( "cassandra.cluster_name" ) + @Default( "Usergrid" ) + String getClusterName(); + + @Key( "cassandra.keyspace.application" ) + @Default( "Usergrid_Applications" ) + String getApplicationKeyspace(); + + @Key( "cassandra.port" ) + @Default( "9160" ) + int getThriftPort(); + + @Key( USERNAME ) + @Default( DEFAULT_USERNAME ) + String getUsername(); + + @Key( PASSWORD ) + @Default( DEFAULT_PASSWORD ) + String getPassword(); + + @Key( "cassandra.datacenter.local" ) + @Default( DEFAULT_LOCAL_DC ) + String getLocalDataCenter(); + + @Key( "cassandra.connections" ) + @Default( DEFAULT_CONNECTION_POOLSIZE ) + int getConnections(); + + @Key( "cassandra.timeout" ) + @Default( "10000" ) + int getTimeout(); + + @Key( "cassandra.timeout.pool" ) + @Default( "5000" ) + int getPoolTimeout(); + + @Key("cassandra.discovery") + @Default( "RING_DESCRIBE" ) + String getDiscoveryType(); + + + @Default("CL_LOCAL_QUORUM") + @Key(ASTYANAX_READ_CL) + String getAstyanaxReadCL(); + + @Default("CL_QUORUM") + @Key(ASTYANAX_READ_CONSISTENT_CL) + String getAstyanaxConsistentReadCL(); + + @Default("CL_LOCAL_QUORUM") + @Key(ASTYANAX_WRITE_CL) + String getAstyanaxWriteCL(); + + + @Default("LOCAL_QUORUM") + @Key(READ_CL) + String getReadCl(); + + @Default("QUORUM") + @Key(READ_CL_CONSISTENT) + String getReadClConsistent(); + + @Default("LOCAL_QUORUM") + @Key(WRITE_CL) + String getWriteCl(); + + @Default("SimpleStrategy") + @Key( STRATEGY ) + String getStrategy(); + + @Default("replication_factor:1") + @Key( STRATEGY_OPTIONS ) + String getStrategyOptions(); + + /** + * Return the history of all shard values which are immutable. For instance, if shard values + * are initially set to 20 (the default) then increased to 40, the property should contain the string of + * "20, 40" so that we can read historic data. + * + * @return + */ + @Default("20") + @Key(SHARD_VALUES) + String getShardValues(); + + /** + * Get the thrift transport size. Should be set to what is on the cassandra servers. As we move to CQL, this will become obsolete + * @return + */ + @Key( THRIFT_TRANSPORT_SIZE) + @Default( "15728640" ) + int getThriftBufferSize(); + + + /** + * Returns the name of the keyspace that should be used for Locking + */ + @Key( LOCKS_KEYSPACE_NAME ) + @Default("Locks") + String getLocksKeyspace(); + + /** + * Returns the Astyanax consistency level for writing a Lock + */ + @Key(LOCKS_CL) + @Default("CL_LOCAL_QUORUM") + String getLocksCl(); + + /** + * Returns a flag on whether or not to share the connection pool with other keyspaces + */ + @Key( LOCKS_SHARED_POOL_FLAG ) + @Default("true") + boolean useSharedPoolForLocks(); + + /** + * Returns a flag on whether or not to share the connection pool with other keyspaces + */ + @Key( LOCKS_CONNECTIONS ) + @Default( DEFAULT_CONNECTION_POOLSIZE ) + int getConnectionsLocks(); + + /** + * Returns a flag on whether or not to share the connection pool with other keyspaces + */ + @Key( LOCKS_KEYSPACE_REPLICATION ) + @Default("replication_factor:1") + String getLocksKeyspaceReplication(); + + /** + * Returns a flag on whether or not to share the connection pool with other keyspaces + */ + @Key( LOCKS_KEYSPACE_STRATEGY ) + @Default( "org.apache.cassandra.locator.SimpleStrategy" ) + String getLocksKeyspaceStrategy(); + + /** + * Return the expiration that should be used for expiring a lock if it's not released + */ + @Key( LOCKS_EXPIRATION ) + @Default(DEFAULT_LOCKS_EXPIRATION) + int getLocksExpiration(); + ++ /** ++ * How many times to attempt lock keyspace and column family creation ++ */ ++ @Key( LOCK_MANAGER_INIT_RETRIES ) ++ @Default( "100" ) ++ int getLockManagerInitRetries(); ++ ++ /** ++ * Return the expiration that should be used for expiring a lock if it's not released ++ */ ++ @Key( LOCK_MANAGER_INIT_INTERVAL ) ++ @Default( "1000" ) ++ int getLockManagerInitInterval(); ++ +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/rest/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java index 0000000,68f366b..40cedcd mode 000000,100644..100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java @@@ -1,0 -1,298 +1,298 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.usergrid.tools; + + + import java.util.*; + import java.util.concurrent.atomic.AtomicInteger; + ++import com.datastax.driver.core.ConsistencyLevel; + import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + import com.netflix.astyanax.model.Column; -import com.netflix.astyanax.model.ConsistencyLevel; + import com.netflix.astyanax.util.RangeBuilder; + import org.apache.usergrid.persistence.Entity; + import org.apache.usergrid.persistence.EntityManager; + import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; + import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; + import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; + import org.apache.usergrid.persistence.collection.serialization.impl.*; + import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; + import org.apache.usergrid.persistence.model.entity.SimpleId; + import org.apache.usergrid.persistence.model.field.StringField; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.commons.cli.CommandLine; + import org.apache.commons.cli.Option; + import org.apache.commons.cli.OptionBuilder; + import org.apache.commons.cli.Options; + + + import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; + import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; + import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; + + + + public class UniqueValueScanner extends ToolBase { + + private static final Logger logger = LoggerFactory.getLogger( UniqueValueScanner.class ); + + private static final String APPLICATION_ARG = "app"; + + private static final String ENTITY_TYPE_ARG = "entityType"; + + private static final String ENTITY_NAME_ARG = "entityName"; + + private static final String ENTITY_FIELD_TYPE_ARG = "fieldType"; + + + + //copied shamelessly from unique value serialization strat. + private static final ScopedRowKeySerializer<TypeField> ROW_KEY_SER = + new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() ); + + + private final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer(); + + private final MultiTenantColumnFamily<ScopedRowKey<TypeField>, EntityVersion> CF_UNIQUE_VALUES = + new MultiTenantColumnFamily<>( "Unique_Values_V2", ROW_KEY_SER, ENTITY_VERSION_SER ); + + private com.netflix.astyanax.Keyspace keyspace; + + private MvccEntitySerializationStrategy mvccEntitySerializationStrategy; + + private UniqueValueSerializationStrategy uniqueValueSerializationStrategy; + + private EntityManager em; + + @Override + @SuppressWarnings( "static-access" ) + public Options createOptions() { + + + Options options = super.createOptions(); + + + Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( true ) + .withDescription( "application id" ).create( APPLICATION_ARG ); + + + options.addOption( appOption ); + + Option collectionOption = + OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired( true ).withDescription( "collection name" ) + .create(ENTITY_TYPE_ARG); + + options.addOption( collectionOption ); + + Option specificEntityNameOption = + OptionBuilder.withArgName(ENTITY_NAME_ARG).hasArg().isRequired( false ).withDescription( "specific entity name" ) + .create(ENTITY_NAME_ARG); + + options.addOption( specificEntityNameOption ); + + Option fieldTypeOption = + OptionBuilder.withArgName(ENTITY_FIELD_TYPE_ARG).hasArg().isRequired( false ).withDescription( "field type" ) + .create(ENTITY_FIELD_TYPE_ARG); + + options.addOption( fieldTypeOption ); + + return options; + } + + + /* + * (non-Javadoc) + * + * @see + * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine) + */ + @Override + public void runTool( CommandLine line ) throws Exception { + + startSpring(); + + UUID appToFilter = null; + if (!line.getOptionValue(APPLICATION_ARG).isEmpty()) { + appToFilter = UUID.fromString(line.getOptionValue(APPLICATION_ARG)); + } + + logger.info("Staring Tool: UniqueValueScanner"); + logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")); + + + keyspace = injector.getInstance(com.netflix.astyanax.Keyspace.class); + mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class); + uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class); + + String fieldType = + line.getOptionValue(ENTITY_FIELD_TYPE_ARG) != null ? line.getOptionValue(ENTITY_FIELD_TYPE_ARG) : "name" ; + String entityType = line.getOptionValue(ENTITY_TYPE_ARG); + String entityName = line.getOptionValue(ENTITY_NAME_ARG); + + AtomicInteger count = new AtomicInteger(0); + + if (entityName != null && !entityName.isEmpty()) { + + if(appToFilter == null){ + throw new RuntimeException("Cannot execute UniqueValueScanner with specific entity without the " + + "application UUID for which the entity should exist."); + } + + if(entityType == null){ + throw new RuntimeException("Cannot execute UniqueValueScanner without the entity type (singular " + + "collection name)."); + } + + logger.info("Running entity unique load only"); + + + //do stuff w/o read repair + UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy.load( + new ApplicationScopeImpl( new SimpleId(appToFilter, "application" ) ), - ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")), entityType, ++ ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "LOCAL_QUORUM")), entityType, + Collections.singletonList(new StringField( fieldType, entityName) ), false); + + StringBuilder stringBuilder = new StringBuilder(); + + stringBuilder.append("["); + + uniqueValueSet.forEach( uniqueValue -> { + + + String entry = "fieldName="+uniqueValue.getField().getName()+ + ", fieldValue="+uniqueValue.getField().getValue()+ + ", uuid="+uniqueValue.getEntityId().getUuid()+ + ", type="+uniqueValue.getEntityId().getType()+ + ", version="+uniqueValue.getEntityVersion(); + stringBuilder.append("{").append(entry).append("},"); + }); + + stringBuilder.deleteCharAt(stringBuilder.length() -1); + stringBuilder.append("]"); + + logger.info("Returned unique value set from serialization load = {}", stringBuilder.toString()); + + } else { + + logger.info("Running entity unique scanner only"); + + + // scan through all unique values and log some info + + Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<TypeField>, EntityVersion>> rows = null; + try { + + rows = keyspace.prepareQuery(CF_UNIQUE_VALUES) - .setConsistencyLevel(ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"))) ++ .setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"))) + .getAllRows() + .withColumnRange(new RangeBuilder().setLimit(1000).build()) + .execute().getResult().iterator(); + + } catch (ConnectionException e) { + + logger.error("Error connecting to cassandra", e); + } + + + UUID finalAppToFilter = appToFilter; + + if( rows != null) { + rows.forEachRemaining(row -> { + + count.incrementAndGet(); + + if(count.get() % 1000 == 0 ){ + logger.info("Scanned {} rows in {}", count.get(), CF_UNIQUE_VALUES.getName()); + } + + final String fieldName = row.getKey().getKey().getField().getName(); + final String fieldValue = row.getKey().getKey().getField().getValue().toString(); + final String scopeType = row.getKey().getScope().getType(); + final UUID scopeUUID = row.getKey().getScope().getUuid(); + + + if (!fieldName.equalsIgnoreCase(fieldType) || + (finalAppToFilter != null && !finalAppToFilter.equals(scopeUUID)) + ) { + // do nothing + + } else { + + + // if we have more than 1 column, let's check for a duplicate + if (row.getColumns() != null && row.getColumns().size() > 1) { + + final List<EntityVersion> values = new ArrayList<>(row.getColumns().size()); + + Iterator<Column<EntityVersion>> columns = row.getColumns().iterator(); + columns.forEachRemaining(column -> { + + + final EntityVersion entityVersion = column.getName(); + + + logger.trace( + scopeType + ": " + scopeUUID + ", " + + fieldName + ": " + fieldValue + ", " + + "entity type: " + entityVersion.getEntityId().getType() + ", " + + "entity uuid: " + entityVersion.getEntityId().getUuid() + ); + + + if (entityType != null && + entityVersion.getEntityId().getType().equalsIgnoreCase(entityType) + ) { + + // add the first value into the list + if (values.size() == 0) { + + values.add(entityVersion); + + + } else { + + if (!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())) { + + values.add(entityVersion); + + logger.error("Duplicate found for field [{}={}]. Entry 1: [{}], Entry 2: [{}]", + fieldName, fieldValue, values.get(0).getEntityId(), entityVersion.getEntityId()); + + } + + } + + + } + + }); + } + } + + + }); + }else{ + + logger.warn("No rows returned from table: {}", CF_UNIQUE_VALUES.getName()); + + } + + } + } + }
