This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bd6f46  ATLAS-3276: Fix stale transactions in atlas due to ATLAS-3246 
(Free-text search)
2bd6f46 is described below

commit 2bd6f4691dd733b4c204f3f1df4e3de0e31fdb42
Author: Sarath Subramanian <[email protected]>
AuthorDate: Thu Jun 13 14:06:17 2019 -0700

    ATLAS-3276: Fix stale transactions in atlas due to ATLAS-3246 (Free-text 
search)
---
 .../graphdb/janus/AtlasJanusGraphIndexClient.java  | 76 +++++++++++++++-------
 .../org/apache/atlas/services/MetricsService.java  |  2 +
 .../org/apache/atlas/util/AtlasMetricsUtil.java    | 28 ++++++++
 3 files changed, 83 insertions(+), 23 deletions(-)

diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index 4dd641d..3a64d31 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -135,7 +135,8 @@ public class AtlasJanusGraphIndexClient implements 
AtlasGraphIndexClient {
 
     @Override
     public Map<String, List<AtlasAggregationEntry>> 
getAggregatedMetrics(String queryString, Set<String> propertyKeyNames) {
-        SolrClient solrClient = null;
+        SolrClient           solrClient = null;
+        AtlasGraphManagement management = graph.getManagementSystem();
 
         try {
             solrClient = Solr6Index.getSolrClient(); // get solr client using 
same settings as that of Janus Graph
@@ -153,7 +154,6 @@ public class AtlasJanusGraphIndexClient implements 
AtlasGraphIndexClient {
             }
 
             SolrQuery            solrQuery                         = new 
SolrQuery();
-            AtlasGraphManagement management                        = 
graph.getManagementSystem();
             Map<String, String>  indexFieldName2PropertyKeyNameMap = new 
HashMap<>();
 
             solrQuery.setQuery(queryString);
@@ -192,7 +192,9 @@ public class AtlasJanusGraphIndexClient implements 
AtlasGraphIndexClient {
             }
         } catch (Exception e) {
             LOG.error("Error enocunted in getting the aggregation metrics. 
Will return empty agregation.", e);
-        }finally {
+        } finally {
+            graphManagementCommit(management);
+
             Solr6Index.releaseSolrClient(solrClient);
         }
 
@@ -214,7 +216,7 @@ public class AtlasJanusGraphIndexClient implements 
AtlasGraphIndexClient {
 
             //update the request handler
             performRequestHandlerAction(collectionName, solrClient,
-                                        
generatePayLoadForSuggestions(generateSuggestionsString(collectionName, 
graph.getManagementSystem(), suggestionProperties)));
+                                        
generatePayLoadForSuggestions(generateSuggestionsString(collectionName, 
suggestionProperties)));
         } catch (Throwable t) {
             String msg = String.format("Error encountered in creating the 
request handler '%s' for collection '%s'", Constants.TERMS_REQUEST_HANDLER, 
collectionName);
 
@@ -282,6 +284,24 @@ public class AtlasJanusGraphIndexClient implements 
AtlasGraphIndexClient {
         return Collections.EMPTY_LIST;
     }
 
+    private void graphManagementCommit(AtlasGraphManagement management) {
+        try {
+            management.commit();
+        } catch (Exception ex) {
+            LOG.warn("Graph transaction management commit failed; attempting 
rollback: {}", ex);
+
+            graphManagementRollback(management);
+        }
+    }
+
+    private void graphManagementRollback(AtlasGraphManagement management) {
+        try {
+            management.rollback();
+        } catch (Exception ex) {
+            LOG.warn("Graph transaction management rollback failed: {}", ex);
+        }
+    }
+
     @VisibleForTesting
     static List<String> getTopTerms(Map<String, TermFreq> termsMap) {
         final List<String> ret;
@@ -353,44 +373,54 @@ public class AtlasJanusGraphIndexClient implements 
AtlasGraphIndexClient {
 
     }
 
-    private String generateSearchWeightString(AtlasGraphManagement management, 
String indexName, Map<String, Integer> propertyName2SearchWeightMap) {
-        StringBuilder searchWeightBuilder = new StringBuilder();
-
-        for (Map.Entry<String, Integer> entry : 
propertyName2SearchWeightMap.entrySet()) {
-            AtlasPropertyKey propertyKey    = 
management.getPropertyKey(entry.getKey());
-            String           indexFieldName = 
management.getIndexFieldName(indexName, propertyKey);
+    private String generateSearchWeightString(String indexName, Map<String, 
Integer> propertyName2SearchWeightMap) {
+        StringBuilder        searchWeightBuilder = new StringBuilder();
+        AtlasGraphManagement management          = graph.getManagementSystem();
 
-            searchWeightBuilder.append(" ")
-                    .append(indexFieldName)
-                    .append("^")
-                    .append(entry.getValue().intValue());
+        try {
+            for (Map.Entry<String, Integer> entry : 
propertyName2SearchWeightMap.entrySet()) {
+                AtlasPropertyKey propertyKey    = 
management.getPropertyKey(entry.getKey());
+                String           indexFieldName = 
management.getIndexFieldName(indexName, propertyKey);
+
+                searchWeightBuilder.append(" ")
+                        .append(indexFieldName)
+                        .append("^")
+                        .append(entry.getValue().intValue());
+            }
+        } finally {
+            graphManagementCommit(management);
         }
 
         return searchWeightBuilder.toString();
     }
 
-    private String generateSuggestionsString(String collectionName, 
AtlasGraphManagement management, List<String> suggestionProperties) {
-        StringBuilder stringBuilder = new StringBuilder();
+    private String generateSuggestionsString(String collectionName, 
List<String> suggestionProperties) {
+        StringBuilder        ret        = new StringBuilder();
+        AtlasGraphManagement management = graph.getManagementSystem();
 
-        for(String propertyName: suggestionProperties) {
-            AtlasPropertyKey propertyKey    = 
management.getPropertyKey(propertyName);
-            String           indexFieldName = 
management.getIndexFieldName(collectionName, propertyKey);
+        try {
+            for (String propertyName : suggestionProperties) {
+                AtlasPropertyKey propertyKey    = 
management.getPropertyKey(propertyName);
+                String           indexFieldName = 
management.getIndexFieldName(collectionName, propertyKey);
 
-            stringBuilder.append("'").append(indexFieldName).append("', ");
+                ret.append("'").append(indexFieldName).append("', ");
+            }
+        } finally {
+            graphManagementCommit(management);
         }
 
-        return stringBuilder.toString();
+        return ret.toString();
     }
 
     private V2Response updateFreeTextRequestHandler(SolrClient solrClient, 
String collectionName, Map<String, Integer> propertyName2SearchWeightMap) 
throws IOException, SolrServerException, AtlasBaseException {
-        String searchWeightString = 
generateSearchWeightString(graph.getManagementSystem(), collectionName, 
propertyName2SearchWeightMap);
+        String searchWeightString = generateSearchWeightString(collectionName, 
propertyName2SearchWeightMap);
         String payLoadString      = 
generatePayLoadForFreeText("update-requesthandler", searchWeightString);
 
         return performRequestHandlerAction(collectionName, solrClient, 
payLoadString);
     }
 
     private V2Response createFreeTextRequestHandler(SolrClient solrClient, 
String collectionName, Map<String, Integer> propertyName2SearchWeightMap) 
throws IOException, SolrServerException, AtlasBaseException {
-        String searchWeightString = 
generateSearchWeightString(graph.getManagementSystem(), collectionName, 
propertyName2SearchWeightMap);
+        String searchWeightString = generateSearchWeightString(collectionName, 
propertyName2SearchWeightMap);
         String payLoadString      = 
generatePayLoadForFreeText("create-requesthandler", searchWeightString);
 
         return performRequestHandlerAction(collectionName, solrClient, 
payLoadString);
diff --git 
a/repository/src/main/java/org/apache/atlas/services/MetricsService.java 
b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index 8fb68e9..62e81e2 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.services;
 
 import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.metrics.AtlasMetrics;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -74,6 +75,7 @@ public class MetricsService {
     }
 
     @SuppressWarnings("unchecked")
+    @GraphTransaction
     public AtlasMetrics getMetrics() {
         Collection<String> entityDefNames         = 
typeRegistry.getAllEntityDefNames();
         Collection<String> classificationDefNames = 
typeRegistry.getAllClassificationDefNames();
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index 0c86aff..f658caa 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -165,10 +165,15 @@ public class AtlasMetricsUtil {
                 @Override
                 public void run() {
                     graph.query().has(TYPE_NAME_PROPERTY_KEY, 
TYPE_NAME_INTERNAL).vertices(1);
+
+                    graphCommit();
                 }
             }, 10, TimeUnit.SECONDS);
         } catch (Exception e) {
             LOG.error(e.getMessage());
+
+            graphRollback();
+
             return false;
         }
 
@@ -183,10 +188,15 @@ public class AtlasMetricsUtil {
                 @Override
                 public void run() {
                     graph.indexQuery(Constants.VERTEX_INDEX, 
query).vertices(0, 1);
+
+                    graphCommit();
                 }
             }, 10, TimeUnit.SECONDS);
         } catch (Exception e) {
             LOG.error(e.getMessage());
+
+            graphRollback();
+
             return false;
         }
 
@@ -228,6 +238,24 @@ public class AtlasMetricsUtil {
         }
     }
 
+    private void graphCommit() {
+        try {
+            graph.commit();
+        } catch (Exception ex) {
+            LOG.warn("Graph transaction commit failed: {}; attempting to 
rollback graph transaction.", ex);
+
+            graphRollback();
+        }
+    }
+
+    private void graphRollback() {
+        try {
+            graph.rollback();
+        } catch (Exception ex) {
+            LOG.warn("Graph transaction rollback failed: {}", ex);
+        }
+    }
+
     private String millisToTimeDiff(long msDiff) {
         StringBuilder sb = new StringBuilder();
 

Reply via email to