improvements to CollectionIterator tool for duplicate and orphaned connections
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0632ceff Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0632ceff Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0632ceff Branch: refs/heads/master Commit: 0632ceff5b40b3a23f1561589cb911c3ca45cb6b Parents: c98a5e9 Author: Mike Dunker <[email protected]> Authored: Mon Mar 12 12:19:56 2018 -0700 Committer: Keyur Karnik <[email protected]> Committed: Tue Aug 28 16:41:44 2018 -0700 ---------------------------------------------------------------------- .../persistence/index/utils/UUIDUtils.java | 7 +- .../usergrid/tools/CollectionIterator.java | 80 ++++++++++++++------ .../org/apache/usergrid/tools/ToolBase.java | 26 +++++++ 3 files changed, 89 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/0632ceff/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java index bf8dd3c..1ba9f3c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java @@ -303,8 +303,11 @@ public class UUIDUtils { if ( uuid == null ) { return 0; } - long t = uuid.timestamp(); - return ( t - KCLOCK_OFFSET ) / KCLOCK_MULTIPLIER_L; + return getUnixTimestampInMillisFromUUIDTimestamp(uuid.timestamp()); + } + + public static long getUnixTimestampInMillisFromUUIDTimestamp( long uuidTimestamp ) { + return ( uuidTimestamp - KCLOCK_OFFSET ) / KCLOCK_MULTIPLIER_L; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0632ceff/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java ---------------------------------------------------------------------- diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java index 26b5f5f..bfe5edf 100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java @@ -23,23 +23,16 @@ import java.util.*; import com.google.common.base.Optional; import com.netflix.astyanax.MutationBatch; -import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; -import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; -import org.apache.usergrid.corepersistence.results.IdQueryExecutor; -import org.apache.usergrid.corepersistence.service.CollectionSearch; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.graph.*; -import org.apache.usergrid.persistence.graph.impl.SimpleEdge; -import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization; import org.apache.usergrid.persistence.index.utils.UUIDUtils; import org.apache.usergrid.persistence.model.entity.*; -import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.schema.CollectionInfo; import org.apache.usergrid.utils.InflectionUtils; import static org.apache.commons.lang.StringUtils.isBlank; @@ -63,7 +56,9 @@ public class CollectionIterator extends ToolBase { private static final String ENTITY_TYPE_ARG = "entityType"; - private static final String REMOVE_CONNECTIONS_ARG = "removeConnections"; + private static final String REMOVE_DUPLICATE_CONNECTIONS_ARG = "removeDuplicateConnections"; + + private static final String REMOVE_ORPHAN_CONNECTIONS_ARG = "removeOrphanConnections"; private static final String LATEST_TIMESTAMP_ARG = "latestTimestamp"; @@ -95,11 +90,17 @@ public class CollectionIterator extends ToolBase { options.addOption( collectionOption ); - Option removeConnectionsOption = - OptionBuilder.withArgName(REMOVE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" ) - .create(REMOVE_CONNECTIONS_ARG); + Option removeOrphanConnectionsOption = + OptionBuilder.withArgName(REMOVE_ORPHAN_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" ) + .create(REMOVE_ORPHAN_CONNECTIONS_ARG); + + options.addOption( removeOrphanConnectionsOption ); + + Option removeDuplicateConnectionsOption = + OptionBuilder.withArgName(REMOVE_DUPLICATE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove duplicate connections" ) + .create(REMOVE_DUPLICATE_CONNECTIONS_ARG); - options.addOption( removeConnectionsOption ); + options.addOption( removeDuplicateConnectionsOption ); Option earliestTimestampOption = OptionBuilder.withArgName(EARLIEST_TIMESTAMP_ARG).hasArg().isRequired( false ).withDescription( "earliest timestamp to delete" ) @@ -137,7 +138,8 @@ public class CollectionIterator extends ToolBase { String applicationOption = line.getOptionValue(APPLICATION_ARG); String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG); - String removeConnectionsOption = line.getOptionValue(REMOVE_CONNECTIONS_ARG); + String removeOrphanConnectionsOption = line.getOptionValue(REMOVE_ORPHAN_CONNECTIONS_ARG); + String removeDuplicateConnectionsOption = line.getOptionValue(REMOVE_DUPLICATE_CONNECTIONS_ARG); String earliestTimestampOption = line.getOptionValue(EARLIEST_TIMESTAMP_ARG); String latestTimestampOption = line.getOptionValue(LATEST_TIMESTAMP_ARG); String secondsInPastOption = line.getOptionValue(SECONDS_IN_PAST_ARG); @@ -152,7 +154,8 @@ public class CollectionIterator extends ToolBase { } String entityType = entityTypeOption; - final boolean removeOrphans = !isBlank(removeConnectionsOption) && removeConnectionsOption.toLowerCase().equals("yes"); + final boolean removeOrphans = !isBlank(removeOrphanConnectionsOption) && removeOrphanConnectionsOption.toLowerCase().equals("yes"); + final boolean removeDuplicates = !isBlank(removeDuplicateConnectionsOption) && removeDuplicateConnectionsOption.toLowerCase().equals("yes"); if (!isBlank(secondsInPastOption) && !isBlank(latestTimestampOption)) { throw new RuntimeException("Can't specify both latest timestamp and seconds in past options."); @@ -222,32 +225,63 @@ public class CollectionIterator extends ToolBase { new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent(), false ); + Set<UUID> uuidSet = new HashSet<>(); + gm.loadEdgesFromSource(search).map(markedEdge -> { UUID uuid = markedEdge.getTargetNode().getUuid(); + long edgeTimestamp = markedEdge.getTimestamp(); + String edgeType = markedEdge.getType(); + boolean duplicate = uuidSet.contains(uuid); + if (!duplicate) { + uuidSet.add(uuid); + } try { EntityRef entityRef = new SimpleEntityRef(entityType, uuid); org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef); long timestamp = 0; + DateFormat df = new SimpleDateFormat(); + df.setTimeZone(TimeZone.getTimeZone("GMT")); String dateString = "NOT TIME-BASED"; if (UUIDUtils.isTimeBased(uuid)){ timestamp = UUIDUtils.getTimestampInMillis(uuid); Date uuidDate = new Date(timestamp); - DateFormat df = new SimpleDateFormat(); - df.setTimeZone(TimeZone.getTimeZone("GMT")); dateString = df.format(uuidDate) + " GMT"; } + Date uuidEdgeDate = new Date(UUIDUtils.getUnixTimestampInMillisFromUUIDTimestamp(edgeTimestamp)); + String edgeDateString = df.format(uuidEdgeDate) + " GMT"; + if ( retrieved != null ){ - logger.info("{} - {} - entity data found", uuid, dateString); + if (duplicate) { + if (removeDuplicates) { + logger.info("DUPLICATE ENTITY (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}", + uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString); + try { + MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID()); + logger.info("BATCH: {}", batch); + batch.execute(); + } catch (Exception e) { + logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage()); + } + } else { + logger.info("DUPLICATE ENTITY (WON'T REMOVE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}", + uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString); + } + + } else { + logger.info("ENTITY: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}", + uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString); + } + + }else{ if (removeOrphans && timestamp >= earliestTimestamp && timestamp <= latestTimestamp) { - logger.info("{} - {} - entity data NOT found, REMOVING", uuid, dateString); + logger.info("NOT FOUND (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}", + uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted()); try { - //em.removeItemFromCollection(headEntity, collectionName, entityRef ); - logger.info("entityRef.asId(): {}", entityRef.asId()); MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID()); logger.info("BATCH: {}", batch); batch.execute(); @@ -255,9 +289,11 @@ public class CollectionIterator extends ToolBase { logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage()); } } else if (removeOrphans) { - logger.info("{} - {} ({}) - entity data NOT found, not removing because timestamp not in range", uuid, dateString, timestamp); + logger.info("NOT FOUND (TIMESTAMP OUT OF RANGE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}", + uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted()); } else { - logger.info("{} - {} ({}) - entity data NOT found", uuid, dateString, timestamp); + logger.info("NOT FOUND: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}", + uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted()); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/0632ceff/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java ---------------------------------------------------------------------- diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java index 62636ea..97a7d5a 100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java @@ -105,11 +105,29 @@ public abstract class ToolBase { if ( line.hasOption( "host" ) ) { System.setProperty( "cassandra.url", line.getOptionValue( "host" ) ); + } + + if ( line.hasOption( "eshost" ) ) { System.setProperty( "elasticsearch.hosts", line.getOptionValue( "eshost" ) ); + } + + if ( line.hasOption( "escluster" ) ) { System.setProperty( "elasticsearch.cluster_name", line.getOptionValue( "escluster" ) ); + } + + if ( line.hasOption( "ugcluster" ) ) { System.setProperty( "usergrid.cluster_name", line.getOptionValue( "ugcluster" ) ); } + if ( line.hasOption( "appkeyspace" ) ) { + System.setProperty( "cassandra.keyspace.application", line.getOptionValue( "appkeyspace" ) ); + } + + if ( line.hasOption( "lockskeyspace" ) ) { + System.setProperty( "cassandra.lock.keyspace", line.getOptionValue( "lockskeyspace" ) ); + } + + try { runTool( line ); } @@ -153,6 +171,12 @@ public abstract class ToolBase { Option remoteOption = OptionBuilder .withDescription( "Use remote Cassandra instance" ).create( "remote" ); + Option ugAppKeyspace = OptionBuilder.withArgName( "appkeyspace" ).hasArg() + .withDescription( "Usergrid Application keyspace" ).create( "appkeyspace" ); + + Option ugLocksKeyspace = OptionBuilder.withArgName( "lockskeyspace" ).hasArg() + .withDescription( "Usergrid Locks keyspace" ).create( "lockskeyspace" ); + Option verbose = OptionBuilder .withDescription( "Print on the console an echo of the content written to the file" ) .create( VERBOSE ); @@ -163,6 +187,8 @@ public abstract class ToolBase { options.addOption( esClusterOption ); options.addOption( ugClusterOption ); options.addOption( remoteOption ); + options.addOption( ugAppKeyspace ); + options.addOption( ugLocksKeyspace ); options.addOption( verbose ); return options;
