This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 2bd6f46 ATLAS-3276: Fix stale transactions in atlas due to ATLAS-3246
(Free-text search)
2bd6f46 is described below
commit 2bd6f4691dd733b4c204f3f1df4e3de0e31fdb42
Author: Sarath Subramanian <[email protected]>
AuthorDate: Thu Jun 13 14:06:17 2019 -0700
ATLAS-3276: Fix stale transactions in atlas due to ATLAS-3246 (Free-text
search)
---
.../graphdb/janus/AtlasJanusGraphIndexClient.java | 76 +++++++++++++++-------
.../org/apache/atlas/services/MetricsService.java | 2 +
.../org/apache/atlas/util/AtlasMetricsUtil.java | 28 ++++++++
3 files changed, 83 insertions(+), 23 deletions(-)
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index 4dd641d..3a64d31 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -135,7 +135,8 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
@Override
public Map<String, List<AtlasAggregationEntry>>
getAggregatedMetrics(String queryString, Set<String> propertyKeyNames) {
- SolrClient solrClient = null;
+ SolrClient solrClient = null;
+ AtlasGraphManagement management = graph.getManagementSystem();
try {
solrClient = Solr6Index.getSolrClient(); // get solr client using
same settings as that of Janus Graph
@@ -153,7 +154,6 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
}
SolrQuery solrQuery = new
SolrQuery();
- AtlasGraphManagement management =
graph.getManagementSystem();
Map<String, String> indexFieldName2PropertyKeyNameMap = new
HashMap<>();
solrQuery.setQuery(queryString);
@@ -192,7 +192,9 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
}
} catch (Exception e) {
LOG.error("Error enocunted in getting the aggregation metrics.
Will return empty agregation.", e);
- }finally {
+ } finally {
+ graphManagementCommit(management);
+
Solr6Index.releaseSolrClient(solrClient);
}
@@ -214,7 +216,7 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
//update the request handler
performRequestHandlerAction(collectionName, solrClient,
-
generatePayLoadForSuggestions(generateSuggestionsString(collectionName,
graph.getManagementSystem(), suggestionProperties)));
+
generatePayLoadForSuggestions(generateSuggestionsString(collectionName,
suggestionProperties)));
} catch (Throwable t) {
String msg = String.format("Error encountered in creating the
request handler '%s' for collection '%s'", Constants.TERMS_REQUEST_HANDLER,
collectionName);
@@ -282,6 +284,24 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
return Collections.EMPTY_LIST;
}
+ private void graphManagementCommit(AtlasGraphManagement management) {
+ try {
+ management.commit();
+ } catch (Exception ex) {
+ LOG.warn("Graph transaction management commit failed; attempting
rollback: {}", ex);
+
+ graphManagementRollback(management);
+ }
+ }
+
+ private void graphManagementRollback(AtlasGraphManagement management) {
+ try {
+ management.rollback();
+ } catch (Exception ex) {
+ LOG.warn("Graph transaction management rollback failed: {}", ex);
+ }
+ }
+
@VisibleForTesting
static List<String> getTopTerms(Map<String, TermFreq> termsMap) {
final List<String> ret;
@@ -353,44 +373,54 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
}
- private String generateSearchWeightString(AtlasGraphManagement management,
String indexName, Map<String, Integer> propertyName2SearchWeightMap) {
- StringBuilder searchWeightBuilder = new StringBuilder();
-
- for (Map.Entry<String, Integer> entry :
propertyName2SearchWeightMap.entrySet()) {
- AtlasPropertyKey propertyKey =
management.getPropertyKey(entry.getKey());
- String indexFieldName =
management.getIndexFieldName(indexName, propertyKey);
+ private String generateSearchWeightString(String indexName, Map<String,
Integer> propertyName2SearchWeightMap) {
+ StringBuilder searchWeightBuilder = new StringBuilder();
+ AtlasGraphManagement management = graph.getManagementSystem();
- searchWeightBuilder.append(" ")
- .append(indexFieldName)
- .append("^")
- .append(entry.getValue().intValue());
+ try {
+ for (Map.Entry<String, Integer> entry :
propertyName2SearchWeightMap.entrySet()) {
+ AtlasPropertyKey propertyKey =
management.getPropertyKey(entry.getKey());
+ String indexFieldName =
management.getIndexFieldName(indexName, propertyKey);
+
+ searchWeightBuilder.append(" ")
+ .append(indexFieldName)
+ .append("^")
+ .append(entry.getValue().intValue());
+ }
+ } finally {
+ graphManagementCommit(management);
}
return searchWeightBuilder.toString();
}
- private String generateSuggestionsString(String collectionName,
AtlasGraphManagement management, List<String> suggestionProperties) {
- StringBuilder stringBuilder = new StringBuilder();
+ private String generateSuggestionsString(String collectionName,
List<String> suggestionProperties) {
+ StringBuilder ret = new StringBuilder();
+ AtlasGraphManagement management = graph.getManagementSystem();
- for(String propertyName: suggestionProperties) {
- AtlasPropertyKey propertyKey =
management.getPropertyKey(propertyName);
- String indexFieldName =
management.getIndexFieldName(collectionName, propertyKey);
+ try {
+ for (String propertyName : suggestionProperties) {
+ AtlasPropertyKey propertyKey =
management.getPropertyKey(propertyName);
+ String indexFieldName =
management.getIndexFieldName(collectionName, propertyKey);
- stringBuilder.append("'").append(indexFieldName).append("', ");
+ ret.append("'").append(indexFieldName).append("', ");
+ }
+ } finally {
+ graphManagementCommit(management);
}
- return stringBuilder.toString();
+ return ret.toString();
}
private V2Response updateFreeTextRequestHandler(SolrClient solrClient,
String collectionName, Map<String, Integer> propertyName2SearchWeightMap)
throws IOException, SolrServerException, AtlasBaseException {
- String searchWeightString =
generateSearchWeightString(graph.getManagementSystem(), collectionName,
propertyName2SearchWeightMap);
+ String searchWeightString = generateSearchWeightString(collectionName,
propertyName2SearchWeightMap);
String payLoadString =
generatePayLoadForFreeText("update-requesthandler", searchWeightString);
return performRequestHandlerAction(collectionName, solrClient,
payLoadString);
}
private V2Response createFreeTextRequestHandler(SolrClient solrClient,
String collectionName, Map<String, Integer> propertyName2SearchWeightMap)
throws IOException, SolrServerException, AtlasBaseException {
- String searchWeightString =
generateSearchWeightString(graph.getManagementSystem(), collectionName,
propertyName2SearchWeightMap);
+ String searchWeightString = generateSearchWeightString(collectionName,
propertyName2SearchWeightMap);
String payLoadString =
generatePayLoadForFreeText("create-requesthandler", searchWeightString);
return performRequestHandlerAction(collectionName, solrClient,
payLoadString);
diff --git
a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index 8fb68e9..62e81e2 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -18,6 +18,7 @@
package org.apache.atlas.services;
import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -74,6 +75,7 @@ public class MetricsService {
}
@SuppressWarnings("unchecked")
+ @GraphTransaction
public AtlasMetrics getMetrics() {
Collection<String> entityDefNames =
typeRegistry.getAllEntityDefNames();
Collection<String> classificationDefNames =
typeRegistry.getAllClassificationDefNames();
diff --git
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index 0c86aff..f658caa 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -165,10 +165,15 @@ public class AtlasMetricsUtil {
@Override
public void run() {
graph.query().has(TYPE_NAME_PROPERTY_KEY,
TYPE_NAME_INTERNAL).vertices(1);
+
+ graphCommit();
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
+
+ graphRollback();
+
return false;
}
@@ -183,10 +188,15 @@ public class AtlasMetricsUtil {
@Override
public void run() {
graph.indexQuery(Constants.VERTEX_INDEX,
query).vertices(0, 1);
+
+ graphCommit();
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
+
+ graphRollback();
+
return false;
}
@@ -228,6 +238,24 @@ public class AtlasMetricsUtil {
}
}
+ private void graphCommit() {
+ try {
+ graph.commit();
+ } catch (Exception ex) {
+ LOG.warn("Graph transaction commit failed: {}; attempting to
rollback graph transaction.", ex);
+
+ graphRollback();
+ }
+ }
+
+ private void graphRollback() {
+ try {
+ graph.rollback();
+ } catch (Exception ex) {
+ LOG.warn("Graph transaction rollback failed: {}", ex);
+ }
+ }
+
private String millisToTimeDiff(long msDiff) {
StringBuilder sb = new StringBuilder();