Better handle different combinations of possible duplicate entities and their respective versions.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70f50c22 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70f50c22 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70f50c22 Branch: refs/heads/master Commit: 70f50c22dfae76471713a6a01eae772b1b7bd9b6 Parents: 5c81a02 Author: Michael Russo <[email protected]> Authored: Wed Jun 22 22:35:59 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Wed Jun 22 22:35:59 2016 -0700 ---------------------------------------------------------------------- .../UniqueValueSerializationStrategyImpl.java | 116 +++++++++++++------ ...niqueValueSerializationStrategyImplTest.java | 115 ++++++++++++++++++ 2 files changed, 197 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/70f50c22/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index a323746..2a53194 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@ -71,6 +71,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> public static final int COL_VALUE = 0x0; + private final Comparator<UniqueValue> uniqueValueComparator = new UniqueValueComparator(); + private final SerializationFig serializationFig; protected final Keyspace keyspace; @@ -266,7 +268,7 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results = keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys ) - .withColumnRange(new RangeBuilder().setLimit(serializationFig.getBufferSize()).build()) + .withColumnRange(new RangeBuilder().setLimit(serializationFig.getMaxLoadSize()).build()) .execute().getResult().iterator(); @@ -285,10 +287,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> continue; } - // these used duplicate tracking and cleanup - UUID oldestEntityUUID = null; - UniqueValueImpl firstUniqueValue = null; + List<UniqueValue> candidates = new ArrayList<>(); /** * While iterating the columns, a rule is being enforced to only EVER return the oldest UUID. This means @@ -304,55 +304,77 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> final EntityVersion entityVersion = columnList.next().getName(); - final UUID currentEntityUUID = entityVersion.getEntityId().getUuid(); - final UniqueValueImpl uniqueValue = + final UniqueValue uniqueValue = new UniqueValueImpl(field, entityVersion.getEntityId(), entityVersion.getEntityVersion()); - // keep track of the first and oldest entity (not version), - // knowing the the first one may not be the 'oldest' - if(oldestEntityUUID == null){ + // set the initial candidate and move on + if(candidates.size() == 0){ + candidates.add(uniqueValue); + continue; + } - oldestEntityUUID = currentEntityUUID; - firstUniqueValue = uniqueValue; + final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() -1)); - }else if(currentEntityUUID.timestamp() < oldestEntityUUID.timestamp()){ + if(result == 0){ - oldestEntityUUID = currentEntityUUID; - } + // do nothing, only versions can be newer and we're not worried about newer versions of same entity + if(logger.isTraceEnabled()){ + logger.trace("Candidate unique value is equal to the current unique value"); + } + // update candidate w/ latest version + candidates.add(uniqueValue); - // only return the oldest (original) unique value entries - if(currentEntityUUID.timestamp() <= oldestEntityUUID.timestamp() ){ + }else if(result < 0){ - if (logger.isTraceEnabled()) { - logger.trace("Putting unique value [{}={}] into result set with entity id [{}] and entity version [{}]", - field.getName(), field.getValue().toString(), - entityVersion.getEntityId(), - entityVersion.getEntityVersion()); - } + // delete the duplicate from the unique value index + candidates.forEach(candidate -> { - uniqueValueSet.addValue(uniqueValue); + try { - } - // remove the newer duplicated unique value entries - else{ + logger.warn("Duplicate unique value [{}={}] found, removing older entry " + + "with entity id [{}] and entity version [{}]", field.getName(), field.getValue().toString(), + candidate.getEntityId().getUuid(), candidate.getEntityVersion() ); - delete(appScope, uniqueValue ).execute(); + delete(appScope, candidate ).execute(); + } catch (ConnectionException e) { + // do nothing for now + } - } + }); - } + candidates.clear(); + + if(logger.isTraceEnabled()) { + logger.info("Updating candidate to entity id [{}] and entity version [{}]", + uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion()); + + } - // check to see if the very first unique value entry was overwritten because it was not the oldest - // if so, clean up its unique index - if( !uniqueValueSet.getValue( firstUniqueValue.getField().getName() ) - .getEntityId().getUuid().equals(firstUniqueValue.getEntityId().getUuid()) ){ + // add our new candidate to the list + candidates.add(uniqueValue); + + // add our new candidate to the result set + //uniqueValueSet.addValue(candidate); + + }else{ + + logger.warn("Duplicate unique value [{}={}] found, removing newer entry " + + "with entity id [{}] and entity version [{}]", field.getName(), field.getValue().toString(), + uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion() ); + + // delete the duplicate from the unique value index + delete(appScope, uniqueValue ).execute(); + + + } - delete(appScope, firstUniqueValue).execute(); } + // take the last candidate ( should be the latest version) and add to the result set + uniqueValueSet.addValue(candidates.get(candidates.size() -1)); } @@ -472,4 +494,30 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> * @param uniqueValueId The uniqueValue */ protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId, final Id uniqueValueId ); + + + + private class UniqueValueComparator implements Comparator<UniqueValue> { + + @Override + public int compare(UniqueValue o1, UniqueValue o2) { + + if( o1.getEntityId().getUuid().equals(o2.getEntityId().getUuid())){ + + return 0; + + }else if( o1.getEntityId().getUuid().timestamp() < o2.getEntityId().getUuid().timestamp()){ + + return -1; + + } + + // if the UUIDs are not equal and o1's timestamp is not less than o2's timestamp, + // then o1 must be greater than o2 + return 1; + + + } + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/70f50c22/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java index 6c4e72b..ed3e42b 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java @@ -442,4 +442,119 @@ public abstract class UniqueValueSerializationStrategyImplTest { } + @Test + public void testDuplicateEntitiesDescending() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + UUID version3 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId3, version1 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 ); + UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 ); + + + strategy.write( scope, stored1 ).execute(); + strategy.write( scope, stored2 ).execute(); + strategy.write( scope, stored3 ).execute(); + + + // load descending to get the older version of entity for this unique value + UniqueValueSet fields = strategy.load( scope, entityId1.getType(), Collections.<Field>singleton( field )); + + UniqueValue retrieved = fields.getValue( field.getName() ); + assertEquals( stored3, retrieved ); + + + } + + @Test + public void testDuplicateEntitiesAscending() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + UUID version3 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 ); + UniqueValue stored3 = new UniqueValueImpl( field, entityId3, version3 ); + + + strategy.write( scope, stored1 ).execute(); + strategy.write( scope, stored2 ).execute(); + strategy.write( scope, stored3 ).execute(); + + + // load descending to get the older version of entity for this unique value + UniqueValueSet fields = strategy.load( scope, entityId1.getType(), Collections.<Field>singleton( field )); + + UniqueValue retrieved = fields.getValue( field.getName() ); + assertEquals( stored1, retrieved ); + + + } + + @Test + public void testMixedDuplicates() throws ConnectionException, InterruptedException { + + ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + IntegerField field = new IntegerField( "count", 5 ); + Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); + + + + UUID version1 = UUIDGenerator.newTimeUUID(); + UUID version2 = UUIDGenerator.newTimeUUID(); + UUID version3 = UUIDGenerator.newTimeUUID(); + UUID version4 = UUIDGenerator.newTimeUUID(); + UUID version5 = UUIDGenerator.newTimeUUID(); + + UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version5 ); + UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version4 ); + UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 ); + UniqueValue stored4 = new UniqueValueImpl( field, entityId3, version2 ); + UniqueValue stored5 = new UniqueValueImpl( field, entityId3, version1 ); + + + + strategy.write( scope, stored1 ).execute(); + strategy.write( scope, stored2 ).execute(); + strategy.write( scope, stored3 ).execute(); + strategy.write( scope, stored4 ).execute(); + strategy.write( scope, stored5 ).execute(); + + + // load descending to get the older version of entity for this unique value + UniqueValueSet fields = strategy.load( scope, entityId1.getType(), Collections.<Field>singleton( field )); + + UniqueValue retrieved = fields.getValue( field.getName() ); + assertEquals( stored1, retrieved ); + + + } + }
