This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new a4ee6c0 ATLAS-4370: Persist Metrics for user to retrieve Metrics info at a past timestamp a4ee6c0 is described below commit a4ee6c05ad6e8759cf2945024684bfbd1e7af8e1 Author: JP Li <jp.li@8634.local> AuthorDate: Thu Dec 2 01:15:53 2021 -0500 ATLAS-4370: Persist Metrics for user to retrieve Metrics info at a past timestamp Co-authored-by: Disha Talreja <dishatalr...@cloudera.com> Signed-off-by: Sarath Subramanian <sar...@apache.org> --- addons/models/0000-Area0/0010-base_model.json | 42 ++++ .../java/org/apache/atlas/AtlasConfiguration.java | 4 +- .../main/java/org/apache/atlas/AtlasErrorCode.java | 1 + .../model/metrics/AtlasMetricsMapToChart.java | 62 +++++ .../atlas/model/metrics/AtlasMetricsStat.java | 159 +++++++++++++ .../org/apache/atlas/utils/AtlasEntityUtil.java | 50 ++++ .../repository/graph/GraphBackedSearchIndexer.java | 7 + .../ogm/metrics/AtlasMetricsStatDTO.java | 153 ++++++++++++ .../org/apache/atlas/services/MetricsService.java | 264 ++++++++++++++++++++- .../test/java/org/apache/atlas/TestModules.java | 2 + .../apache/atlas/services/MetricsServiceTest.java | 78 +++++- .../resources/solr/core-template/solrconfig.xml | 2 +- .../apache/atlas/web/resources/AdminResource.java | 196 ++++++++++++++- 13 files changed, 1001 insertions(+), 19 deletions(-) diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json index 769d885..a4a9248 100644 --- a/addons/models/0000-Area0/0010-base_model.json +++ b/addons/models/0000-Area0/0010-base_model.json @@ -371,6 +371,48 @@ ] }, { + "name": "__AtlasMetricsStat", + "superTypes": [ + "__internal" + ], + "serviceType": "atlas_core", + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "metricsId", + "typeName": "string", + "isOptional": false, + "cardinality": "SINGLE", + "isUnique": true, + "isIndexable": true + }, + { + "name": "metrics", + "typeName": "string", + "isOptional": true, + "cardinality": "SINGLE", + "isUnique": false, + "isIndexable": false + }, + { + "name": "collectionTime", + "typeName": "long", + "isOptional": true, + "cardinality": "SINGLE", + "isUnique": false, + "isIndexable": true + }, + { + "name": "timeToLiveMillis", + "typeName": "long", + "isOptional": true, + "cardinality": "SINGLE", + "isUnique": false, + "isIndexable": true + } + ] + }, + { "name": "__ExportImportAuditEntry", "serviceType": "atlas_core", "typeVersion": "1.0", diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index b63fab7..28fb68a 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -82,8 +82,8 @@ public enum AtlasConfiguration { DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false), TASKS_USE_ENABLED("atlas.tasks.enabled", true), SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1), - UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true); - + UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true), + METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336); // 14 days default private static final Configuration APPLICATION_PROPERTIES; diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 7d09261..8bc7996 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -211,6 +211,7 @@ public enum AtlasErrorCode { GLOSSARY_TERM_ALREADY_EXISTS(409, "ATLAS-409-00-009", "Glossary term with qualifiedName {0} already exists"), GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with qualifiedName {0} already exists"), GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"), + METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"), // All internal errors go here INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"), diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsMapToChart.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsMapToChart.java new file mode 100644 index 0000000..6e74f65 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsMapToChart.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.metrics; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.List; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +/** + * AtlasMetricsForChart is a formatted data type specifically for rendering the Stacked Area Chart. + * The Stacked Area Chart takes three String values as "key". For the Atlas Metrics entity, they are "Active", "Deleted", and "Shell". + * The Stacked Area Chart also takes a list of pairs (primitive values) as "values". + * The first element in the pair is collectionTime. It is used for rendering x-axis of the chart. + * The second element is the Atlas Metrics entity's count. It is used for rendering y-axis of the chart. + */ +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AtlasMetricsMapToChart { + private String key; + private List<long[]> values; + + public AtlasMetricsMapToChart(String key, List<long[]> values) { + this.key = key; + this.values = values; + } + + public String getKey() { return key; } + + public void setKey(String key) { + this.key = key; + } + + public List<long[]> getValues() { + return values; + } + + public void setValues(List<long[]> values) { + this.values = values; + } + +} \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsStat.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsStat.java new file mode 100644 index 0000000..5a30cad --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetricsStat.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.metrics; + + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.model.AtlasBaseModelObject; +import org.apache.atlas.utils.AtlasEntityUtil; +import org.apache.commons.collections.CollectionUtils; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + + +/** + * Atlas MetricsStat which includes Metrics' collection time and time to live (TTL). + */ +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AtlasMetricsStat extends AtlasBaseModelObject implements Serializable { + + public static final String METRICS_CATEGORY_GENERAL_PROPERTY = "general"; + public static final String METRICS_COLLECTION_TIME_PROPERTY = "collectionTime"; + public static final String METRICS_ID_PREFIX_PROPERTY = "atlas_metrics_"; + + private String metricsId; + private long collectionTime; + private long timeToLiveMillis; + + private Map<String, Object> typeData; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private AtlasMetrics metrics; + + public AtlasMetricsStat() { + } + + public AtlasMetricsStat(String guid) { + setGuid(guid); + } + + public AtlasMetricsStat(AtlasMetrics metrics){ + this(metrics, null); + } + + public AtlasMetricsStat(AtlasMetrics metrics,List<String> listOfTypeNames) { + this(metrics, TimeUnit.HOURS.toMillis(AtlasConfiguration.METRICS_TIME_TO_LIVE_HOURS.getInt()),listOfTypeNames); + + } + + public AtlasMetricsStat(AtlasMetrics metrics, long timeToLiveMillis, List<String> listOfTypeNames) { + collectionTime = metrics == null ? + System.currentTimeMillis() : (long) metrics.getMetric(METRICS_CATEGORY_GENERAL_PROPERTY, METRICS_COLLECTION_TIME_PROPERTY); + setCollectionTime(collectionTime); + + setMetricsId(METRICS_ID_PREFIX_PROPERTY + getCollectionTime() + "@" + AtlasEntityUtil.getMetadataNamespace()); + + setTimeToLiveMillis(timeToLiveMillis); + setMetrics(metrics); + setGuid(getGuid()); + + this.typeData = CollectionUtils.isEmpty(listOfTypeNames) ? null : new HashMap<>(); + AtlasEntityUtil.metricsToTypeData(metrics, listOfTypeNames, typeData); + } + + + + public String getMetricsId() { + return metricsId; + } + + public void setMetricsId(String metricsId) { + this.metricsId = metricsId; + } + + public long getCollectionTime() { + return collectionTime; + } + + public void setCollectionTime(long collectionTime) { + this.collectionTime = collectionTime; + } + + public long getTimeToLiveMillis() { + return timeToLiveMillis; + } + + public void setTimeToLiveMillis(long timeToLiveMillis) { + this.timeToLiveMillis = timeToLiveMillis; + } + + public AtlasMetrics getMetrics() { + return metrics; + } + + public void setMetrics(AtlasMetrics metrics) { + this.metrics = metrics; + } + + public Map<String, Object> getTypeData() { + return typeData; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + AtlasMetricsStat that = (AtlasMetricsStat) o; + return Objects.equals(metrics, that.metrics); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), metrics); + } + + @Override + protected StringBuilder toString(StringBuilder sb) { + sb.append(", metricsId=").append(metricsId); + sb.append(", collectionTime=").append(collectionTime); + sb.append(", timeToLiveMillis=").append(timeToLiveMillis); + sb.append(", metrics="); + if (metrics == null) { + sb.append("null"); + } else { + sb.append(metrics); + } + + return sb; + } +} \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java index 1e78e25..2b7846c 100644 --- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java +++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java @@ -18,8 +18,11 @@ package org.apache.atlas.utils; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -41,6 +44,15 @@ public class AtlasEntityUtil { private static final int SOFT_REFERENCE_FORMAT_INDEX_TYPE_NAME = 0; private static final int SOFT_REFERENCE_FORMAT_INDEX_GUID = 1; + public static final String CONF_METADATA_NAMESPACE = "atlas.metadata.namespace"; + public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + public static final String DEFAULT_CLUSTER_NAME = "default"; + + protected static final String ENTITY = "entity"; + protected static final String ACTIVE = "Active"; + protected static final String DELETED = "Deleted"; + protected static final String SHELL = "Shell"; + protected static final String[] STATUS_CATEGORY = {ACTIVE, DELETED, SHELL}; public static String formatSoftRefValue(String typeName, String guid) { return String.format(SOFT_REF_FORMAT, typeName, guid); @@ -166,4 +178,42 @@ public class AtlasEntityUtil { return ret; } + public static String getMetadataNamespace() { + String ret = StringUtils.EMPTY; + try { + ret = ApplicationProperties.get().getString(CONF_METADATA_NAMESPACE, StringUtils.EMPTY); + if (StringUtils.isEmpty(ret)) { + ret = ApplicationProperties.get().getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); + } + } catch (AtlasException e) { + LOG.info("Failed to load application properties", e); + } + return StringUtils.isNotEmpty(ret) ? ret : DEFAULT_CLUSTER_NAME; + } + + public static void metricsToTypeData(AtlasMetrics metrics, String typeName, Map<String, Object> typeData) { + if (typeData == null) { + return; + } + + Map<String, Integer> innerVal = new HashMap<>(); + + for (String status : STATUS_CATEGORY) { + Map<String, Integer> metricsMap = (Map<String, Integer>) metrics.getData().get(ENTITY).get(ENTITY + status); + innerVal.put(status, metricsMap.getOrDefault(typeName, 0)); + } + + typeData.put(typeName, innerVal); + } + + public static void metricsToTypeData(AtlasMetrics metrics, List<String> typeNames, Map<String, Object> typeData) { + if (typeData == null || CollectionUtils.isEmpty(typeNames)) { + return; + } + + for (String typeName : typeNames) { + metricsToTypeData(metrics, typeName, typeData); + } + } + } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index d65bb1a..9924b2e 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -356,6 +356,13 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang // index recovery createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + //metrics + createCommonVertexIndex(management," __AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + createCommonVertexIndex(management," __AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + createCommonVertexIndex(management," __AtlasMetricsStat.metrics", UniqueKind.NONE, String.class, SINGLE, true, false); + createCommonVertexIndex(management," __AtlasMetricsStat.collectionTime", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + createCommonVertexIndex(management," __AtlasMetricsStat.timeToLiveMillis", UniqueKind.NONE, String.class, SINGLE, true, false); + // create vertex-centric index createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE); createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE); diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/metrics/AtlasMetricsStatDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/metrics/AtlasMetricsStatDTO.java new file mode 100644 index 0000000..6af935e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/metrics/AtlasMetricsStatDTO.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.ogm.metrics; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.model.metrics.AtlasMetricsStat; +import org.apache.atlas.repository.impexp.AuditsWriter; +import org.apache.atlas.repository.ogm.AbstractDataTransferObject; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.atlas.model.metrics.AtlasMetricsStat.METRICS_ID_PREFIX_PROPERTY; + + +/** + * AtlasMetricsStatDTO is the bridge class in between AtlasMetricsStat and AtlasEntity. + */ +@Component +public class AtlasMetricsStatDTO extends AbstractDataTransferObject<AtlasMetricsStat> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasMetricsStatDTO.class); + + public static final String METRICS_ENTITY_TYPE_NAME = "__AtlasMetricsStat"; + public static final String METRICS_ID_PROPERTY = "metricsId"; + private static final String METRICS_PROPERTY = "metrics"; + private static final String COLLECTION_TIME_PROPERTY = "collectionTime"; + private static final String TIME_TO_LIVE_PROPERTY = "timeToLiveMillis"; + private static final String UNIQUE_NAME_PROPERTY = "uniqueName"; + + @Inject + public AtlasMetricsStatDTO(AtlasTypeRegistry typeRegistry) { + super(typeRegistry, AtlasMetricsStat.class, METRICS_ENTITY_TYPE_NAME); + } + + @Override + public AtlasMetricsStat from(AtlasEntity entity) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasMetricsStatDTO.from({})", entity); + } + + AtlasMetricsStat metricsStat = null; + + String jsonMetrics = (String) entity.getAttribute(METRICS_PROPERTY); + + if (StringUtils.isNotEmpty(jsonMetrics)) { + metricsStat = new AtlasMetricsStat(AtlasType.fromJson(jsonMetrics, AtlasMetrics.class)); + } + + if (metricsStat == null) { + LOG.error("MetricStat cannot be created without metric info. Null has been returned."); + } else { + metricsStat.setGuid(entity.getGuid()); + metricsStat.setMetricsId((String) entity.getAttribute(METRICS_ID_PROPERTY)); + + metricsStat.setCollectionTime((long) entity.getAttribute(COLLECTION_TIME_PROPERTY)); + + metricsStat.setTimeToLiveMillis((long) entity.getAttribute(TIME_TO_LIVE_PROPERTY)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasMetricsStatDTO.from() : {}", metricsStat); + } + + return metricsStat; + } + + @Override + public AtlasMetricsStat from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasMetricsStatDTO.from({})", entityWithExtInfo); + } + + AtlasMetricsStat ret = from(entityWithExtInfo.getEntity()); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasMetricsStatDTO.from() : {}", ret); + } + return ret; + } + + @Override + public AtlasEntity toEntity(AtlasMetricsStat obj) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasMetricsStatDTO.toEntity({})", obj); + } + + AtlasEntity entity = getDefaultAtlasEntity(obj); + + entity.setAttribute(METRICS_ID_PROPERTY, getUniqueValue(obj)); + + if (obj.getMetrics() != null) { + entity.setAttribute(METRICS_PROPERTY, AtlasType.toJson(obj.getMetrics())); + } + + entity.setAttribute(COLLECTION_TIME_PROPERTY, obj.getCollectionTime()); + entity.setAttribute(TIME_TO_LIVE_PROPERTY, obj.getTimeToLiveMillis()); + entity.setAttribute(UNIQUE_NAME_PROPERTY, getUniqueValue(obj)); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasMetricsStatDTO.toEntity() : {}", entity); + } + return entity; + } + + @Override + public AtlasEntityWithExtInfo toEntityWithExtInfo(AtlasMetricsStat obj) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasMetricsStatDTO.toEntityWithExtInfo({})", obj); + } + AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo(toEntity(obj)); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasMetricsStatDTO.toEntityWithExtInfo() : {}", ret); + } + return ret; + } + + @Override + public Map<String, Object> getUniqueAttributes(AtlasMetricsStat obj) { + Map<String, Object> ret = new HashMap<>(); + ret.put(METRICS_ID_PROPERTY, getUniqueValue(obj)); + return ret; + } + + private String getUniqueValue(AtlasMetricsStat obj) { + return METRICS_ID_PREFIX_PROPERTY + obj.getCollectionTime() + "@" + AuditsWriter.getCurrentClusterName(); + } +} diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java index ceb2528..9ec2cd2 100644 --- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java +++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java @@ -17,40 +17,62 @@ */ package org.apache.atlas.services; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.SortOrder; import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasTypesDefFilterRequest; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.model.metrics.AtlasMetricsMapToChart; +import org.apache.atlas.model.metrics.AtlasMetricsStat; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.AtlasMetricJVMUtil; import org.apache.atlas.util.AtlasMetricsUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.atlas.discovery.SearchProcessor.AND_STR; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO.METRICS_ENTITY_TYPE_NAME; +import static org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO.METRICS_ID_PROPERTY; @AtlasService public class MetricsService { private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class); + private final DataAccess dataAccess; + // Query Category constants public static final String TYPE = "type"; public static final String TYPE_SUBTYPES = "typeAndSubTypes"; @@ -76,6 +98,7 @@ public class MetricsService { protected static final String METRIC_ENTITY_ACTIVE_INCL_SUBTYPES = ENTITY + "Active"+"-"+TYPE_SUBTYPES; protected static final String METRIC_ENTITY_DELETED_INCL_SUBTYPES = ENTITY + "Deleted"+"-"+TYPE_SUBTYPES; protected static final String METRIC_ENTITY_SHELL_INCL_SUBTYPES = ENTITY + "Shell"+"-"+TYPE_SUBTYPES; + protected static final String[] STATUS_CATEGORY = {"Active", "Deleted", "Shell"}; private final AtlasGraph atlasGraph; private final AtlasTypeRegistry typeRegistry; @@ -83,10 +106,12 @@ public class MetricsService { private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix(); @Inject - public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) { + public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil, + DataAccess dataAccess) { this.atlasGraph = graph; this.typeRegistry = typeRegistry; this.metricsUtil = metricsUtil; + this.dataAccess = dataAccess; } @SuppressWarnings("unchecked") @@ -134,7 +159,6 @@ public class MetricsService { } } - for (AtlasEntityDef entityDef : entityDefs) { AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityDef.getName()); @@ -262,4 +286,240 @@ public class MetricsService { return ret; } + public AtlasMetricsStat saveMetricsStat(AtlasMetricsStat metricsStat) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.saveMetricsStat({})", metricsStat); + } + + if (Objects.isNull(metricsStat) || StringUtils.isEmpty(metricsStat.getMetricsId())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "MetricsStat definition missing."); + } + + if (metricsStatExists(metricsStat)) { + throw new AtlasBaseException(AtlasErrorCode.METRICSSTAT_ALREADY_EXISTS, String.valueOf(metricsStat.getCollectionTime())); + } + + AtlasMetricsStat storeObject = dataAccess.save(metricsStat); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.saveMetricsStat() : {}", storeObject); + } + + return storeObject; + } + + public void purgeMetricsStats() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.purgeMetricsStats()"); + } + + long currentTimeMillis = System.currentTimeMillis(); + + List<AtlasMetricsStat> metricsStats = getAllMetricsStats(true) + .stream() + .filter(c -> c.getCollectionTime() + c.getTimeToLiveMillis() < currentTimeMillis) + .collect(Collectors.toList()); + + for (AtlasMetricsStat a : metricsStats) { + long collectedTime = a.getCollectionTime(); + deleteMetricsStatByCollectionTime(String.valueOf(collectedTime)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.purgeMetricsStats() : {}", metricsStats); + } + } + + @GraphTransaction + public AtlasMetricsStat getMetricsStatByCollectionTime(final String collectionTime) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.getMetricsStatByCollectionTime({})", collectionTime); + } + + if (StringUtils.isBlank(collectionTime)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "collectionTime is null/empty"); + } + + AtlasMetricsStat ret; + + AtlasMetricsStat metricsStat = new AtlasMetricsStat(); + metricsStat.setCollectionTime(Long.parseLong(collectionTime)); + + ret = dataAccess.load(metricsStat); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.getMetricsStatByCollectionTime() : {}", ret); + } + + return ret; + } + + @GraphTransaction + public void deleteMetricsStatByCollectionTime(final String collectionTime) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.deleteMetricsStatByCollectionTime({})", collectionTime); + } + + if (StringUtils.isEmpty(collectionTime)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, collectionTime); + } + + AtlasMetricsStat deleteStat = getMetricsStatByCollectionTime(collectionTime); + + dataAccess.delete(deleteStat.getGuid()); + + // delete log + if (LOG.isDebugEnabled()) { + long currTime = System.currentTimeMillis(); + long collectedTime = deleteStat.getCollectionTime(); + + LOG.info("MetricsService.deleteMetricsStatByCollectionTime(): At {}, metricsStat with collectionTime: {}, persisted hours: {}, is deleted. ", + Instant.ofEpochMilli(currTime), + Instant.ofEpochMilli(collectedTime), + TimeUnit.MILLISECONDS.toHours(currTime - collectedTime) + ); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.deleteMetricsStatByCollectionTime({})", collectionTime); + } + + } + + @GraphTransaction + public List<AtlasMetricsStat> getAllMetricsStats(Boolean minInfo) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.getAllMetricsStats()"); + } + + List<AtlasMetricsStat> ret = new ArrayList<>(); + // SortOrder.ASCENDING is a necessary input parameter. It only sorts GUIDs, but not collectionTime. + List<String> guids = AtlasGraphUtilsV2.findEntityGUIDsByType(METRICS_ENTITY_TYPE_NAME, SortOrder.ASCENDING); + + if (CollectionUtils.isNotEmpty(guids)) { + List<AtlasMetricsStat> metricsToLoad = guids.stream() + .map(AtlasMetricsStat::new) + .collect(Collectors.toList()); + + Iterable<AtlasMetricsStat> metricsStats = dataAccess.load(metricsToLoad); + + + ret = StreamSupport.stream(metricsStats.spliterator(), false) + .sorted((a, b) -> (int) (b.getCollectionTime() - a.getCollectionTime())) + .map(m -> { + if(minInfo) { + m.setMetrics(null); + } + return m; + }).collect(Collectors.toList()); + + } else { + ret = Collections.emptyList(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.getAllMetricsStats() : {}", ret); + } + + return ret; + } + + + public List<AtlasMetricsStat> getMetricsInRangeByTypeNames(long startTime, + long endTime, + List<String> typeNames) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.getMetricsInRangeByTypeNames({}, {}, {})", startTime, endTime, String.join(", ", typeNames)); + } + + if (startTime >= endTime) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, + "startTime: '" + startTime + "', should be less than, endTime: '" + endTime + "'"); + } + + List<AtlasMetricsStat> metricsInRange; + List<AtlasMetricsStat> allMetrics = getAllMetricsStats(false); + + metricsInRange = allMetrics.stream() + .filter(m -> m.getCollectionTime() >= startTime && m.getCollectionTime() <= endTime) + .map(m -> { + m = new AtlasMetricsStat(m.getMetrics(), typeNames); + m.setMetrics(null); + return m; + }) + .collect(Collectors.toList()); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.getMetricsInRangeByTypeNames() : {}", metricsInRange); + } + + return metricsInRange; + } + + public Map<String, List<AtlasMetricsMapToChart>> getMetricsForChartByTypeNames(long startTime, + long endTime, + List<String> typeNames) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> MetricsService.getMetricsForChartByTypeNames({}, {}, {})", startTime, endTime, typeNames); + } + + // Calling getMetricsInRangeByTypeNames() and constructing AtlasMetricsStat with list of typeNames, to retrieve JanusGraph only once. + Map<String, List<AtlasMetricsMapToChart>> ret = new HashMap<>(); + + // Returned metrics were sorted by collectionTime descendingly. Reverse it to ascending order to match stacked area chart's required input format. + List<AtlasMetricsStat> metrics = getMetricsInRangeByTypeNames(startTime, endTime, typeNames); + Collections.reverse(metrics); + + for (String typeName : typeNames) { + Map<String, List<long[]>> statusCategory = mapToStatusCategoryByOneType(metrics, typeName); + + if (MapUtils.isNotEmpty(statusCategory)) { + ret.put(typeName, statusCategory.entrySet() + .stream() + .map(c -> new AtlasMetricsMapToChart(c.getKey(), c.getValue())) + .collect(Collectors.toList()) + ); + } else { + LOG.info("MetricsService.getMetricsForChartByTypeNames() : data of typeName:{} cannot be found.", typeName); + ret.put(typeName, Collections.emptyList()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== MetricsService.getMetricsForChartByTypeNames() : {}", ret); + } + + return ret; + } + + /** Mapping each typeName's counting info in AtlasMetricsStat to the required format to render the stacked area chart. + * Keys: 3 categories: Active, Deleted & Shell. + * Values: a list of pair with the first element as collectionTime, and the second element as count. + */ + private Map<String, List<long[]>> mapToStatusCategoryByOneType(List<AtlasMetricsStat> metrics, String typeName) { + // Use LinkedHashMap to make sure the status are in order as Active, Deleted and Shell for rendering chart + Map<String, List<long[]>> statusCategory = new LinkedHashMap<>(); + + for (AtlasMetricsStat metric : metrics) { + Map<String, Integer> metricsMap = null; + if (metric.getTypeData() != null) { + metricsMap = (Map<String, Integer>) metric.getTypeData().get(typeName); + } + + for (String status : STATUS_CATEGORY) { + long statusCnt = metricsMap == null? (long) 0: metricsMap.get(status); + statusCategory.computeIfAbsent(status, a -> new ArrayList<>()).add(new long[]{metric.getCollectionTime(), statusCnt}); + } + } + + return statusCategory; + } + + private boolean metricsStatExists(AtlasMetricsStat metricsStat) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(typeRegistry.getEntityTypeByName(METRICS_ENTITY_TYPE_NAME), new HashMap<String, Object>() {{ + put(METRICS_ID_PROPERTY, metricsStat.getMetricsId()); + }}); + return Objects.nonNull(vertex); + } + } \ No newline at end of file diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java index 8dda208..a0a6354 100644 --- a/repository/src/test/java/org/apache/atlas/TestModules.java +++ b/repository/src/test/java/org/apache/atlas/TestModules.java @@ -52,6 +52,7 @@ import org.apache.atlas.repository.ogm.ExportImportAuditEntryDTO; import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryCategoryDTO; import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryDTO; import org.apache.atlas.repository.ogm.glossary.AtlasGlossaryTermDTO; +import org.apache.atlas.repository.ogm.metrics.AtlasMetricsStatDTO; import org.apache.atlas.repository.ogm.profiles.AtlasSavedSearchDTO; import org.apache.atlas.repository.ogm.profiles.AtlasUserProfileDTO; import org.apache.atlas.repository.store.graph.AtlasEntityStore; @@ -178,6 +179,7 @@ public class TestModules { availableDTOs.addBinding().to(AtlasServerDTO.class); availableDTOs.addBinding().to(ExportImportAuditEntryDTO.class); availableDTOs.addBinding().to(AtlasAuditEntryDTO.class); + availableDTOs.addBinding().to(AtlasMetricsStatDTO.class); bind(DTORegistry.class).asEagerSingleton(); bind(DataAccess.class).asEagerSingleton(); diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java index 0405921..d114bf5 100644 --- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java @@ -17,10 +17,12 @@ */ package org.apache.atlas.services; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; import org.apache.atlas.TestModules; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.model.metrics.AtlasMetricsStat; import org.apache.atlas.repository.AtlasTestBase; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.impexp.ImportService; @@ -46,18 +48,9 @@ import java.util.HashMap; import java.util.Map; import static org.apache.atlas.model.metrics.AtlasMetrics.*; +import static org.apache.atlas.services.MetricsService.*; import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; -import static org.apache.atlas.services.MetricsService.ENTITY; -import static org.apache.atlas.services.MetricsService.GENERAL; -import static org.apache.atlas.services.MetricsService.METRIC_ENTITIES_PER_TAG; -import static org.apache.atlas.services.MetricsService.METRIC_ENTITY_ACTIVE; -import static org.apache.atlas.services.MetricsService.METRIC_ENTITY_COUNT; -import static org.apache.atlas.services.MetricsService.METRIC_ENTITY_DELETED; -import static org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT; -import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT; -import static org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT; -import static org.apache.atlas.services.MetricsService.TAG; import static org.testng.Assert.*; @Guice(modules = TestModules.TestOnlyModule.class) @@ -118,6 +111,9 @@ public class MetricsServiceTest extends AtlasTestBase { put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, 1L); }}; + private AtlasMetrics metrics; + private AtlasMetricsStat blankMetricsStat, metricsStatInGraph; + @BeforeClass public void setup() throws Exception { RequestContext.clear(); @@ -145,9 +141,9 @@ public class MetricsServiceTest extends AtlasTestBase { super.cleanup(); } - @Test + @Test(groups = "Metrics.CREATE") public void testGetMetrics() { - AtlasMetrics metrics = metricsService.getMetrics(); + metrics = metricsService.getMetrics(); assertNotNull(metrics); @@ -171,6 +167,64 @@ public class MetricsServiceTest extends AtlasTestBase { assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected); } + @Test(groups = "Metrics.CREATE", dependsOnMethods = "testGetMetrics") + public void testSaveMetricsStat() { + try { + blankMetricsStat = new AtlasMetricsStat(metrics); + metricsStatInGraph = metricsService.saveMetricsStat(blankMetricsStat); + } catch (AtlasBaseException e) { + fail("Save metricsStat should've succeeded", e); + } + + // Duplicate create calls should fail + try { + AtlasMetricsStat blankMetricsStatDup = new AtlasMetricsStat(metrics); + metricsService.saveMetricsStat(blankMetricsStatDup); + fail("Save duplicate metricsStat should've failed"); + } catch (AtlasBaseException e) { + assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.METRICSSTAT_ALREADY_EXISTS); + } + } + + @Test(groups = "Metrics.CREATE", dependsOnMethods = "testSaveMetricsStat") + public void testGetMetricsStatByCollectionTime() { + // collectionTime is empty string + try { + AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(" "); + fail("Get metricsStat by collectionTime should've failed, when collectionTime is empty."); + } catch (AtlasBaseException e) { + assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_PARAMETERS); + } + + // collectionTime is null + try { + AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(null); + fail("Get metricsStat by collectionTime should've failed, when collectionTime is null."); + } catch (AtlasBaseException e) { + assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_PARAMETERS); + } + + // collectionTime is NOT existed + try { + Long collectionTimeInGraph = System.currentTimeMillis(); + AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(String.valueOf(collectionTimeInGraph)); + fail("Get metricsStat by collectionTime should've failed, when collectionTime is NOT existed."); + } catch (AtlasBaseException e) { + assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND); + } + + // collectionTime is correct + try { + Long collectionTimeInGraph = (Long) metrics.getMetric(GENERAL, METRIC_COLLECTION_TIME); + AtlasMetricsStat metricsStatRet = metricsService.getMetricsStatByCollectionTime(String.valueOf(collectionTimeInGraph)); + assertNotNull(metricsStatRet); + assertEquals(metricsStatRet.getGuid(), metricsStatInGraph.getGuid()); + assertEquals(metricsStatRet.getMetricsId(), metricsStatInGraph.getMetricsId()); + } catch (AtlasBaseException e) { + fail("Get metricsStat by valid collectionTime in Graph should've succeeded."); + } + } + @Test public void testNotificationMetrics() { Instant now = Clock.systemUTC().instant(); diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml b/test-tools/src/main/resources/solr/core-template/solrconfig.xml index 3341e71..fa38e72 100644 --- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml +++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml @@ -445,7 +445,7 @@ --> <lst name="defaults"> <str name="defType">edismax</str> - <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 3z0l_t 4bnp_s 4a2t_s 47ph_s 464l_s 49ad_t 4h6t_t 4d8l_t 4eth_l 4flx_t 4lxh_t 4kcl_t 4nid_t 4umd_t 505h_t 54w5_t 52it_t 53b9_t 51qd_t 5b7p_t 5af9_t 5j45_l 5kp1_l 5hj9_t 5fyd_t 5m9x_t 5c05_t 5csl_t 5edh_t 5wjp_t 5xc5_l 5tdx_t 5q85_l 5rt1_l 5n2d_t 5uyt_t 8mit_t 8emd_t 8o3p_t 8k5h_ [...] + <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 1ssl_t 1udh_t 1wqt_t 4d8l_t 4pvp_s 4oat_s 4lxh_s 4kcl_s 4nid_t 4vet_t 4rgl_t 4t1h_l 4ttx_t 505h_t 4ykl_t 51qd_t 58ud_t 5edh_t 5j45_t 5gqt_t 5hj9_t 5fyd_t 5pfp_t 5on9_t 5wjp_l 5y4l_l 5q85_t 5vr9_t 66th_l 68ed_l 658l_t 63np_t 69z9_t 5zph_t 60hx_t 622t_t 6k91_t 6l1h_ [...] <str name="hl.fl">*</str> <bool name="hl.requireFieldMatch">true</bool> <bool name="lowercaseOperators">true</bool> diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 0580f7f..d55ada7 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -47,6 +47,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.model.metrics.AtlasMetricsMapToChart; +import org.apache.atlas.model.metrics.AtlasMetricsStat; import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches; import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.repository.audit.AtlasAuditService; @@ -78,6 +80,9 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.security.core.Authentication; import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; @@ -125,6 +130,7 @@ import static org.apache.atlas.web.filters.AtlasCSRFPreventionFilter.CSRF_TOKEN; @Path("admin") @Singleton @Service +@EnableScheduling public class AdminResource { private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class); private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AdminResource"); @@ -145,6 +151,9 @@ public class AdminResource { private static final String OPERATION_STATUS = "operationStatus"; private static final List TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs()); + private static final String METRICS_PERSIST_INTERVAL = "atlas.metrics.persist.schedule"; + private static final String METRICS_PERSIST_INTERVAL_DEFAULT = "0 0 0/1 * * *"; // 1 hour interval + @Context private HttpServletRequest httpServletRequest; @@ -409,8 +418,156 @@ public class AdminResource { return metrics; } - private void releaseExportImportLock() { - importExportOperationLock.unlock(); + /** Auto-scheduling API for both creating a Metrics entity and saving it to the database at in preset time interval, + * and sweeping through entities that are outside of the valid ttl hours. + * @throws AtlasBaseException when the MetricsStat entity has already existed. + */ + @Scheduled(cron="#{getCronExpression}") + public void scheduleSaveAndDeleteMetrics() throws AtlasBaseException { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> AdminResource.scheduleSaveAndDeleteMetrics()"); + } + + // auto persist + saveMetrics(); + + // auto purge + metricsService.purgeMetricsStats(); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AdminResource.scheduleSaveAndDeleteMetrics()"); + } + } + + /** + * Bulk retrieval API for getting all MetricsStats, with mininfo flag return metrics with specific details, or with minimal information. + * @return all MetricsStats in Atlas. + * @throws AtlasBaseException when there is no MetricsStats entity in the database. + */ + @GET + @Path("metricsstats") + @Produces(Servlets.JSON_MEDIA_TYPE) + public List<AtlasMetricsStat> getAllMetrics(@QueryParam("mininfo") @DefaultValue("true") Boolean minInfo) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.getAllMetrics()"); + } + + return metricsService.getAllMetricsStats(minInfo); + } finally { + AtlasPerfTracer.log(perf); + } + } + + /** + * Retrieval API for retrieving the MetricsStat with a specific collectionTime. + * @return the MetricsStat with the specific collectionTime. + * @throws AtlasBaseException when the MetricsStat entity with this specific collectionTime cannot be found. + */ + @GET + @Path("metricsstat/{collectionTime}") + @Produces(Servlets.JSON_MEDIA_TYPE) + public AtlasMetricsStat getMetricsByCollectionTime(@PathParam("collectionTime") String collectionTime) throws AtlasBaseException { + Servlets.validateQueryParamLength("collectionTime", collectionTime); + + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer + .getPerfTracer(PERF_LOG, + "AdminResource.getMetricsByCollectionTime(collectionTime=" + collectionTime + ")"); + } + + return metricsService.getMetricsStatByCollectionTime(collectionTime); + } finally { + AtlasPerfTracer.log(perf); + } + } + + /** Retrieval API for retrieving persisted MetricsStats with collectionTime within range of startTime and endTime. + * @param startTime start timestamp of the time range. + * @param endTime end timestamp of the time range. + * @param typeNames a list of typeNames with their counting information, as well as their metrics' minimal information. + * @return persisted Metrics with its collectionTime within time range, in the form of minimal information. + * @throws AtlasBaseException when the input of startTime and endTime is null or invalid. + */ + @GET + @Path("metricsstats/range") + @Produces(Servlets.JSON_MEDIA_TYPE) + public List<AtlasMetricsStat> getMetricsInTimeRange(@QueryParam("startTime") String startTime, + @QueryParam("endTime") String endTime, + @QueryParam("typeName") List<String> typeNames) throws AtlasBaseException { + if (StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "startTime or endTime is null/empty."); + } + + Servlets.validateQueryParamLength("startTime", startTime); + Servlets.validateQueryParamLength("endTime", endTime); + for (String typeName : typeNames) { + Servlets.validateQueryParamLength("typeName", typeName); + } + + AtlasPerfTracer perf = null; + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, + "AdminResource.getMetricsInTimeRange(startTime=" + startTime + ", " + + "endTime=" + endTime + ", " + + "listOfTypeNames=" + String.join(", ", typeNames) + ")" ); + } + + return metricsService.getMetricsInRangeByTypeNames(Long.parseLong(startTime), Long.parseLong(endTime), typeNames); + } finally { + AtlasPerfTracer.log(perf); + } + } + + /** Retrieval API for retrieving & formatting MetricsStats (within valid range) to render stacked area chart. The process contains: + * 1. retrieve persisted MetricsStats with collectionTime within range of startTime and endTime by one typeName + * 2. map the returned MetricsStats to the required format for rendering stacked area chart + * Currently, one typeName corresponds to one chart. The API can take multiple typeNames. The returned JSON file can be used to render multiple charts. + * @param startTime start timestamp of the time range. + * @param endTime end timestamp of the time range. + * @param typeNames a list of typeNames with their counting information, as well as their metrics' minimal information. + * @return formatted metrics to render one or multiple stacked area charts. + * @throws AtlasBaseException when the input of startTime and endTime is null or invalid. + */ + @GET + @Path("metricsstats/charts") + @Produces(Servlets.JSON_MEDIA_TYPE) + public Map<String, List<AtlasMetricsMapToChart>> getMetricsForChartByTypeNames( + @QueryParam("startTime") String startTime, + @QueryParam("endTime") String endTime, + @QueryParam("typeName") List<String> typeNames) throws AtlasBaseException { + + if (StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "startTime or endTime is null/empty."); + } + + Servlets.validateQueryParamLength("startTime", startTime); + Servlets.validateQueryParamLength("endTime", endTime); + for (String typeName : typeNames) { + Servlets.validateQueryParamLength("typeName", typeName); + } + + AtlasPerfTracer perf = null; + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, + "AdminResource.getMetricsForChartByTypeNames(" + + "startTime=" + startTime + ", " + + "endTime=" + endTime + ", " + + "listOfTypeNames=" + String.join(", ", typeNames) + ")" ); + } + + return metricsService.getMetricsForChartByTypeNames(Long.parseLong(startTime), Long.parseLong(endTime), typeNames); + } finally { + AtlasPerfTracer.log(perf); + } } @POST @@ -867,4 +1024,39 @@ public class AdminResource { auditService.add(auditOperation, params, AtlasJson.toJson(entityCountByType), resultCount); } + + private void releaseExportImportLock() { + importExportOperationLock.unlock(); + } + + /** Get customized time interval to persist metrics in CM, or use default persist hour (1hr interval). + * There are 6 fields. Default 1 hr interval: 0 0 0/1 * * * + */ + @Bean + private String getCronExpression() { + if (atlasProperties != null) { + return atlasProperties.getString(METRICS_PERSIST_INTERVAL, METRICS_PERSIST_INTERVAL_DEFAULT); + } else { + return METRICS_PERSIST_INTERVAL_DEFAULT; + } + } + + /** Save an AtlasMetrics as AtlasMetricsStat to db. + * @throws AtlasBaseException when the AtlasMetricsStat is null or when the AtlasMetricsStat already exists in db. + */ + private void saveMetrics() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AdminResource.saveMetrics()"); + } + + AtlasMetrics metrics = metricsService.getMetrics(); + + AtlasMetricsStat metricsStat = new AtlasMetricsStat(metrics); + metricsService.saveMetricsStat(metricsStat); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AdminResource.saveMetrics()"); + } + } + }