Repository: usergrid Updated Branches: refs/heads/master 893a04ddc -> c8282f04b
Add tool for auditing versions of entities in Elasticsearch. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c8282f04 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c8282f04 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c8282f04 Branch: refs/heads/master Commit: c8282f04b2dbe602cdc28d0330cbf4168b4ce6ee Parents: 893a04d Author: Michael Russo <[email protected]> Authored: Thu Oct 26 10:34:12 2017 -0700 Committer: Michael Russo <[email protected]> Committed: Thu Oct 26 10:34:12 2017 -0700 ---------------------------------------------------------------------- .../usergrid/tools/EntityVersionAudit.java | 275 +++++++++++++++++++ 1 file changed, 275 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/c8282f04/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java ---------------------------------------------------------------------- diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java b/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java new file mode 100644 index 0000000..c9f4860 --- /dev/null +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.tools; + + +import com.google.common.base.Optional; +import com.netflix.astyanax.MutationBatch; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; +import org.apache.usergrid.corepersistence.service.CollectionService; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.EntityManager; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.SimpleEntityRef; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization; +import org.apache.usergrid.persistence.index.IndexLocationStrategy; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.impl.EsProvider; +import org.apache.usergrid.persistence.index.utils.UUIDUtils; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.schema.CollectionInfo; +import org.apache.usergrid.utils.InflectionUtils; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; + +import java.io.*; +import java.net.URLEncoder; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource; +import static org.apache.usergrid.persistence.Schema.getDefaultSchema; +import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId; + + +public class EntityVersionAudit extends ToolBase { + + /* + + Writes to files in the current directory: + entity_es_urls.txt (contains the relative URLs for the elasticsearch API to GET a document for an entity and version + entity_version_agg.txt ( contains the number of versions per entity on a line) + */ + + private static final Logger logger = LoggerFactory.getLogger( EntityVersionAudit.class ); + + private static final String APPLICATION_ARG = "app"; + + private static final String ENTITY_TYPE_ARG = "entityType"; + + private static final String USE_LATEST_VERSION_ARG = "useLatestVersion"; + + private static final String ENTITY_UUID = "entityUUID"; + + + private EntityManager em; + + @Override + @SuppressWarnings( "static-access" ) + public Options createOptions() { + + + Options options = super.createOptions(); + + + Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( true ) + .withDescription( "application id" ).create( APPLICATION_ARG ); + + + options.addOption( appOption ); + + Option collectionOption = + OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired( true ).withDescription( "singular collection name" ) + .create(ENTITY_TYPE_ARG); + + options.addOption( collectionOption ); + + + Option useLatestVersion = + OptionBuilder.withArgName(USE_LATEST_VERSION_ARG).hasArg().isRequired( false ).withDescription( "use latest version" ) + .create(USE_LATEST_VERSION_ARG); + + options.addOption( useLatestVersion ); + + Option entityUUID = + OptionBuilder.withArgName(ENTITY_UUID).hasArg().isRequired( false ).withDescription( "specific entity uuid" ) + .create(ENTITY_UUID); + + options.addOption( entityUUID ); + + + + return options; + } + + + /* + * (non-Javadoc) + * + * @see + * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine) + */ + @Override + public void runTool( CommandLine line ) throws Exception { + + logger.info("Starting Tool: EntityVersionAudit"); + logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")); + + startSpring(); + + String applicationOption = line.getOptionValue(APPLICATION_ARG); + String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG); + + if (isBlank(applicationOption)) { + throw new RuntimeException("Application ID not provided."); + } + final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG)); + + if (isBlank(entityTypeOption)) { + throw new RuntimeException("Entity type (singular collection name) not provided."); + } + String entityType = entityTypeOption; + + + boolean useLatestVersion = + line.getOptionValue(USE_LATEST_VERSION_ARG) != null && line.getOptionValue(USE_LATEST_VERSION_ARG).equalsIgnoreCase("true"); + logger.info("useLatestVersion {}", useLatestVersion); + + final String entityUUID = line.getOptionValue(ENTITY_UUID); + logger.info("entityUUID {}", entityUUID); + + + em = emf.getEntityManager( app ); + + String collectionName = InflectionUtils.pluralize(entityType); + String simpleEdgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName); + logger.info("simpleEdgeType: {}", simpleEdgeType); + + ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application")); + Id applicationScopeId = applicationScope.getApplication(); + logger.info("applicationScope.getApplication(): {}", applicationScopeId); + + GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class ); + GraphManager gm = gmf.createEdgeManager(applicationScope); + + EntityCollectionManagerFactory emf = injector.getInstance( EntityCollectionManagerFactory.class ); + EntityCollectionManager ecm = emf.createCollectionManager(applicationScope); + + final SimpleSearchByEdgeType search = + new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.absent(), false ); + + final IndexLocationStrategyFactory ilsf = injector.getInstance(IndexLocationStrategyFactory.class); + final Writer versionAuditWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_audit.txt"), "utf-8")); + final Writer versionAggWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_agg.txt"), "utf-8")); + + versionAuditWriter.write("collection,entityUUID,cassandraTimestamp,elasticsearchTimestamp,indexDelayMillis,existsInElasticsearch\n"); + versionAuditWriter.flush(); + + final EsProvider esProvider = injector.getInstance(EsProvider.class); + + gm.loadEdgesFromSource(search).map(markedEdge -> { + + UUID uuid = markedEdge.getTargetNode().getUuid(); + + if (entityUUID == null || uuid.equals(UUID.fromString(entityUUID))){ + logger.info("matched uuid: {}", uuid); + try { + EntityRef entityRef = new SimpleEntityRef(entityType, uuid); + org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef); + + if ( retrieved != null ){ + + final AtomicInteger versionCount = new AtomicInteger(); + Observable<MvccLogEntry> versionObs = ecm.getVersionsFromMaxToMin( retrieved.asId(), org.apache.usergrid.utils.UUIDUtils.newTimeUUID() ); + if (useLatestVersion) { + versionObs = versionObs.take(1); + } + versionObs.forEach( mvccLogEntry -> { + + IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope); + final String readAlias = strategy.getAlias().getReadAlias(); + + final SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(), + CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( retrieved.asId().getType() ) ), retrieved.asId(), + Long.MAX_VALUE ) ); + + final String esDocId = createIndexDocId( applicationScope, retrieved.asId(), mvccLogEntry.getVersion(), searchEdge); + GetResponse response = esProvider.getClient().prepareGet(readAlias, "entity", esDocId) + .execute() + .actionGet(); + boolean exists = response.isExists(); + + long indexTimestamp = response.getField("_timestamp") == null ? 0 : (long)response.getField("_timestamp").getValue(); + long uuidTimestamp = UUIDUtils.getTimestampInMillis(retrieved.getUuid()); + + long diff = 0; + if (indexTimestamp > 0) { + diff = uuidTimestamp = indexTimestamp; + } + + try { + + String csvLine = + collectionName + "," + + uuid + "," + + uuidTimestamp + "," + + indexTimestamp + "," + + diff + "," + + exists; + + //final String url = "/"+readAlias+"/entity/"+URLEncoder.encode(esDocId, "UTF-8"); + versionAuditWriter.write(csvLine+"\n"); + versionAuditWriter.flush(); + versionCount.incrementAndGet(); + + } catch (Exception e) { + e.printStackTrace(); + } + }); + + versionAggWriter.write(versionCount.toString()+","+retrieved.asId().getUuid()+"\n"); + versionAggWriter.flush(); + + }else{ + logger.info("entity: {} NOT FOUND", uuid); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + return markedEdge; + }).toBlocking().lastOrDefault(null); + + versionAuditWriter.close(); + versionAggWriter.close(); + + } +}
