Repository: incubator-ranger
Updated Branches:
  refs/heads/master d40a02054 -> b1b0fb16c


RANGER-838 : Tag-sync should be resilient to Ranger Admin availability

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/b1b0fb16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/b1b0fb16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/b1b0fb16

Branch: refs/heads/master
Commit: b1b0fb16cd4197a67a61d8a1db2c40863739327c
Parents: d40a020
Author: Abhay Kulkarni <[email protected]>
Authored: Wed Jan 27 18:46:49 2016 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Tue Mar 1 18:48:45 2016 -0800

----------------------------------------------------------------------
 .../ranger/tagsync/model/AbstractTagSource.java |  25 +--
 .../apache/ranger/tagsync/model/TagSink.java    |   4 +-
 .../apache/ranger/tagsync/model/TagSource.java  |   5 +-
 .../ranger/tagsync/process/TagSyncConfig.java   |  17 ++
 .../ranger/tagsync/process/TagSynchronizer.java |  55 +++---
 .../tagsync/sink/tagadmin/TagAdminRESTSink.java | 170 ++++++++++++++-----
 .../tagsync/source/atlas/AtlasTagSource.java    |  50 +++---
 .../source/atlasrest/AtlasRESTTagSource.java    |  45 +++--
 .../tagsync/source/atlasrest/AtlasRESTUtil.java |  91 +++++-----
 .../tagsync/source/file/FileTagSource.java      |  53 +++---
 10 files changed, 331 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index d6baeb2..d46170a 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -27,7 +27,6 @@ import org.apache.ranger.plugin.util.ServiceTags;
 public abstract  class AbstractTagSource implements TagSource {
        private static final Log LOG = 
LogFactory.getLog(AbstractTagSource.class);
        private TagSink tagSink;
-       protected boolean shutdown = false;
 
        @Override
        public void setTagSink(TagSink sink) {
@@ -37,29 +36,31 @@ public abstract  class AbstractTagSource implements 
TagSource {
                        this.tagSink = sink;
                }
        }
-       @Override
-       public void synchUp() {}
 
-       public void updateSink(final ServiceTags serviceTags) {
-               if (serviceTags == null) {
+       protected void updateSink(final ServiceTags toUpload) {
+               if (toUpload == null) {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("No ServiceTags to upload");
                        }
                } else {
                        if (LOG.isDebugEnabled()) {
-                               String serviceTagsJSON = new 
Gson().toJson(serviceTags);
-                               LOG.debug("Uploading serviceTags=" + 
serviceTagsJSON);
+                               String toUploadJSON = new 
Gson().toJson(toUpload);
+                               LOG.debug("Uploading serviceTags=" + 
toUploadJSON);
                        }
 
                        try {
-                               tagSink.uploadServiceTags(serviceTags);
+                               ServiceTags uploaded = tagSink.upload(toUpload);
+
+                               if (LOG.isDebugEnabled()) {
+                                       String uploadedJSON = new 
Gson().toJson(uploaded);
+                                       LOG.debug("Uploaded serviceTags=" + 
uploadedJSON);
+                               }
                        } catch (Exception exception) {
-                               LOG.error("uploadServiceTags() failed..", 
exception);
+                               String toUploadJSON = new 
Gson().toJson(toUpload);
+                               LOG.error("Failed to upload serviceTags: " + 
toUploadJSON);
+                               LOG.error("Exception : ", exception);
                        }
                }
        }
 
-       public void stop() {
-               shutdown = true;
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
index 9eb5319..ae66e60 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
@@ -26,5 +26,7 @@ import java.util.Properties;
 
 public interface TagSink {
        boolean initialize(Properties properties);
-       void uploadServiceTags(ServiceTags serviceTags) throws Exception;
+       ServiceTags upload(ServiceTags toUpload) throws Exception;
+       boolean start();
+       void stop();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
index 7d19562..5ef6c57 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.tagsync.model;
 
+
 import java.util.Properties;
 
 public interface TagSource {
@@ -27,12 +28,8 @@ public interface TagSource {
 
        void setTagSink(TagSink sink);
 
-       void synchUp();
-
        boolean start();
 
        void stop();
 
-       boolean isChanged();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index ed8679a..9588d66 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -72,6 +72,10 @@ public class TagSyncConfig extends Configuration {
 
        private static final long DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL 
= 900000;
 
+       private static final String 
TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP = 
"ranger.tagsync.tagadmin.connection.check.interval";
+
+       private static final int 
DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL = 2000;
+
        private Properties props;
 
        public static TagSyncConfig getInstance() {
@@ -286,6 +290,19 @@ public class TagSyncConfig extends Configuration {
                return 
prop.getProperty(TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP);
        }
 
+       static public long getTagAdminConnectionCheckInterval(Properties prop) {
+               long ret = DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL;
+               String val = 
prop.getProperty(TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP);
+               if (StringUtils.isNotBlank(val)) {
+                       try {
+                               ret = Long.valueOf(val);
+                       } catch (NumberFormatException exception) {
+                               // Ignore
+                       }
+               }
+               return ret;
+       }
+
        private TagSyncConfig() {
                super(false);
                init() ;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 696ce5e..5155681 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -32,10 +32,12 @@ public class TagSynchronizer {
 
        private static final Logger LOG = 
Logger.getLogger(TagSynchronizer.class);
 
-       private boolean shutdownFlag = false;
+       private TagSink tagSink = null;
        private List<TagSource> tagSources;
        private Properties properties = null;
 
+       private final Object shutdownNotifier = new Object();
+
        public static void main(String[] args) {
 
                TagSynchronizer tagSynchronizer = new TagSynchronizer();
@@ -49,23 +51,28 @@ public class TagSynchronizer {
                boolean tagSynchronizerInitialized = 
tagSynchronizer.initialize();
 
                if (tagSynchronizerInitialized) {
-                       tagSynchronizer.run();
+                       try {
+                               tagSynchronizer.run();
+                       } catch (Throwable t) {
+                               LOG.error("main thread caught exception..:", t);
+                               System.exit(1);
+                       }
                } else {
                        LOG.error("TagSynchronizer failed to initialize 
correctly, exiting..");
-                       System.exit(-1);
+                       System.exit(1);
                }
 
        }
 
-       public TagSynchronizer() {
-               setProperties(null);
+       TagSynchronizer() {
+               this(null);
        }
 
-       public TagSynchronizer(Properties properties) {
+       TagSynchronizer(Properties properties) {
                setProperties(properties);
        }
 
-       public void setProperties(Properties properties) {
+       void setProperties(Properties properties) {
                if (properties == null || MapUtils.isEmpty(properties)) {
                        this.properties = new Properties();
                } else {
@@ -89,7 +96,7 @@ public class TagSynchronizer {
 
                        LOG.info("Initializing TAG source and sink");
 
-                       TagSink tagSink = initializeTagSink(properties);
+                       tagSink = initializeTagSink(properties);
 
                        if (tagSink != null) {
 
@@ -113,39 +120,32 @@ public class TagSynchronizer {
                return ret;
        }
 
-       public void run() {
+       public void run() throws Exception {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> TagSynchronizer.run()");
                }
 
-               long shutdownCheckIntervalInMs = 60*1000;
-
-               boolean tagSourcesStarted = true;
-
                try {
+                       boolean threadsStarted = tagSink.start();
+
                        for (TagSource tagSource : tagSources) {
-                               tagSourcesStarted = tagSourcesStarted && 
tagSource.start();
+                               threadsStarted = threadsStarted && 
tagSource.start();
                        }
 
-                       if (tagSourcesStarted) {
-                               while (!shutdownFlag) {
-                                       try {
-                                               LOG.debug("Sleeping for [" + 
shutdownCheckIntervalInMs + "] milliSeconds");
-                                               
Thread.sleep(shutdownCheckIntervalInMs);
-                                       } catch (InterruptedException e) {
-                                               LOG.error("Failed to wait for 
[" + shutdownCheckIntervalInMs + "] milliseconds before attempting to 
synchronize tag information ", e);
-                                               break;
-                                       }
+                       if (threadsStarted) {
+                               synchronized(shutdownNotifier) {
+                                       shutdownNotifier.wait();
                                }
                        }
-               } catch (Throwable t) {
-                       LOG.error("tag-sync main thread got an error", t);
                } finally {
                        LOG.info("Stopping all tagSources");
 
                        for (TagSource tagSource : tagSources) {
                                tagSource.stop();
                        }
+
+                       LOG.info("Stopping tagSink");
+                       tagSink.stop();
                }
 
                if (LOG.isDebugEnabled()) {
@@ -155,7 +155,10 @@ public class TagSynchronizer {
 
        public void shutdown(String reason) {
                LOG.info("Received shutdown(), reason=" + reason);
-               this.shutdownFlag = true;
+
+               synchronized(shutdownNotifier) {
+                       shutdownNotifier.notifyAll();
+               }
        }
 
        static public void printConfigurationProperties(Properties properties) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
index 1541034..c296b49 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -33,10 +33,15 @@ import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 
+import javax.servlet.http.HttpServletResponse;
 import java.util.Map;
 import java.util.Properties;
 
-public class TagAdminRESTSink implements TagSink {
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TagAdminRESTSink implements TagSink, Runnable {
        private static final Log LOG = 
LogFactory.getLog(TagAdminRESTSink.class);
 
        private static final String REST_PREFIX = "/service";
@@ -46,45 +51,43 @@ public class TagAdminRESTSink implements TagSink {
 
        private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
 
+       private long rangerAdminConnectionCheckInterval;
+
        private RangerRESTClient tagRESTClient = null;
 
+       private BlockingQueue<UploadWorkItem> uploadWorkItems;
+
+       private Thread myThread = null;
+
        @Override
        public boolean initialize(Properties properties) {
                if(LOG.isDebugEnabled()) {
                        LOG.debug("==> TagAdminRESTSink.initialize()");
                }
 
-               boolean ret = true;
+               boolean ret = false;
 
                String restUrl       = 
TagSyncConfig.getTagAdminRESTUrl(properties);
                String sslConfigFile = 
TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
                String userName = TagSyncConfig.getTagAdminUserName(properties);
                String password = TagSyncConfig.getTagAdminPassword(properties);
+               rangerAdminConnectionCheckInterval = 
TagSyncConfig.getTagAdminConnectionCheckInterval(properties);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("restUrl=" + restUrl);
                        LOG.debug("sslConfigFile=" + sslConfigFile);
                        LOG.debug("userName=" + userName);
+                       LOG.debug("rangerAdminConnectionCheckInterval" + 
rangerAdminConnectionCheckInterval);
                }
 
-               if (StringUtils.isBlank(restUrl)) {
-                       ret = false;
-                       LOG.error("No value specified for property 
'ranger.tagsync.tagadmin.rest.url'!");
-               } else {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("ranger.tagsync.tagadmin.rest.url:" + 
restUrl);
-                       }
-               }
-
-               if (ret) {
+               if (StringUtils.isNotBlank(restUrl)) {
                        tagRESTClient = new RangerRESTClient(restUrl, 
sslConfigFile);
                        tagRESTClient.setBasicAuthInfo(userName, password);
+                       uploadWorkItems = new 
LinkedBlockingQueue<UploadWorkItem>();
 
-                       ret = testConnection();
-               }
-
-               if (!ret) {
-                       LOG.error("Cannot connect to Tag Admin. Please recheck 
configuration properties and/or check if Tag Admin is running and responsive");
+                       ret = true;
+               } else {
+                       LOG.error("No value specified for property 
'ranger.tagsync.tagadmin.rest.url'!");
                }
 
                if(LOG.isDebugEnabled()) {
@@ -94,51 +97,53 @@ public class TagAdminRESTSink implements TagSink {
                return ret;
        }
 
-       public boolean testConnection() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAdminRESTSink.testConnection()");
+       @Override
+       public ServiceTags upload(ServiceTags toUpload) throws Exception {
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> upload() ");
                }
 
-               boolean ret = true;
+               UploadWorkItem uploadWorkItem = new UploadWorkItem(toUpload);
 
-               try {
-                       // build a dummy serviceTags structure and upload it to 
test connectivity
-                       ServiceTags serviceTags = new ServiceTags();
-                       serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-                       uploadServiceTags(serviceTags);
-               } catch (Exception exception) {
-                       LOG.error("test-upload of serviceTags failed.", 
exception);
-                       ret = false;
-               }
+               uploadWorkItems.put(uploadWorkItem);
 
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAdminRESTSink.testConnection(), 
result=" + ret);
+               // Wait until message is successfully delivered
+               ServiceTags ret = uploadWorkItem.waitForUpload();
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== upload()");
                }
+
                return ret;
        }
 
-       @Override
-       synchronized public void uploadServiceTags(ServiceTags serviceTags) 
throws Exception {
+       private ServiceTags doUpload(ServiceTags serviceTags) throws Exception {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> uploadServiceTags()");
+                       LOG.debug("==> doUpload()");
                }
                WebResource webResource = 
createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE);
 
                ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class,
 tagRESTClient.toJson(serviceTags));
 
-               if(response == null || response.getStatus() != 204) {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
+               if(response == null || response.getStatus() != 
HttpServletResponse.SC_NO_CONTENT) {
 
                        RESTResponse resp = 
RESTResponse.fromClientResponse(response);
 
                        LOG.error("Upload of service-tags failed with message " 
+ resp.getMessage());
 
-                       throw new Exception("Upload of service-tags failed with 
response: " + response);
+                       if (response == null || response.getStatus() != 
HttpServletResponse.SC_BAD_REQUEST) {
+                               // NOT an application error
+                               throw new Exception("Upload of service-tags 
failed with response: " + response);
+                       }
+
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== uploadServiceTags()");
+                       LOG.debug("<== doUpload()");
                }
+
+               return serviceTags;
        }
 
        private WebResource createWebResource(String url) {
@@ -159,4 +164,91 @@ public class TagAdminRESTSink implements TagSink {
 
                return ret;
        }
+
+       @Override
+       public boolean start() {
+
+               myThread = new Thread(this);
+               myThread.setDaemon(true);
+               myThread.start();
+
+               return true;
+       }
+
+       @Override
+       public void stop() {
+               if (myThread != null && myThread.isAlive()) {
+                       myThread.interrupt();
+               }
+       }
+
+       @Override
+       public void run() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAdminRESTSink.run()");
+               }
+
+               while (true) {
+                       UploadWorkItem uploadWorkItem;
+
+                       try {
+                               uploadWorkItem = uploadWorkItems.take();
+
+                               ServiceTags toUpload = 
uploadWorkItem.getServiceTags();
+
+                               boolean doRetry;
+
+                               do {
+                                       doRetry = false;
+
+                                       try {
+                                               ServiceTags uploaded = 
doUpload(toUpload);
+                                               // ServiceTags uploaded 
successfully
+                                               
uploadWorkItem.uploadCompleted(uploaded);
+                                       } catch (InterruptedException 
interrupted) {
+                                               LOG.error("Caught exception..: 
", interrupted);
+                                               return;
+                                       } catch (Exception exception) {
+                                               doRetry = true;
+                                               
Thread.sleep(rangerAdminConnectionCheckInterval);
+                                       }
+                               } while (doRetry);
+
+                       }
+                       catch (InterruptedException exception) {
+                               LOG.error("Interrupted..: ", exception);
+                               return;
+                       }
+               }
+
+       }
+
+       class UploadWorkItem {
+               private ServiceTags serviceTags;
+               private BlockingQueue<ServiceTags> uploadedServiceTags;
+
+               ServiceTags getServiceTags() {
+                       return serviceTags;
+               }
+
+               ServiceTags waitForUpload() throws InterruptedException {
+                       return uploadedServiceTags.take();
+               }
+
+               void uploadCompleted(ServiceTags uploaded) throws 
InterruptedException {
+                       // ServiceTags uploaded successfully
+                       uploadedServiceTags.put(uploaded);
+               }
+
+               UploadWorkItem(ServiceTags serviceTags) {
+                       setServiceTags(serviceTags);
+                       uploadedServiceTags = new 
ArrayBlockingQueue<ServiceTags>(1);
+               }
+
+               void setServiceTags(ServiceTags serviceTags) {
+                       this.serviceTags = serviceTags;
+               }
+
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 2499177..49d6f61 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -50,6 +50,7 @@ public class AtlasTagSource extends AbstractTagSource {
        public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
 
        private ConsumerRunnable consumerTask;
+       private Thread myThread = null;
 
        @Override
        public boolean initialize(Properties properties) {
@@ -123,23 +124,24 @@ public class AtlasTagSource extends AbstractTagSource {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> AtlasTagSource.start()");
                }
-               Thread consumerThread = null;
                if (consumerTask == null) {
                        LOG.error("No consumerTask!!!");
                } else {
-                       consumerThread = new Thread(consumerTask);
-                       consumerThread.setDaemon(true);
-                       consumerThread.start();
+                       myThread = new Thread(consumerTask);
+                       myThread.setDaemon(true);
+                       myThread.start();
                }
                if (LOG.isDebugEnabled()) {
                        LOG.debug("<== AtlasTagSource.start()");
                }
-               return consumerThread != null;
+               return myThread != null;
        }
 
        @Override
-       public boolean isChanged() {
-               return true;
+       public void stop() {
+               if (myThread != null && myThread.isAlive()) {
+                       myThread.interrupt();
+               }
        }
 
        private static String getPrintableEntityNotification(EntityNotification 
notification) {
@@ -175,24 +177,32 @@ public class AtlasTagSource extends AbstractTagSource {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("==> ConsumerRunnable.run()");
                        }
-                       while (!shutdown) {
-                               if (hasNext()) {
-                                       EntityNotification notification = 
consumer.next();
-                                       if (notification != null) {
-                                               if (LOG.isDebugEnabled()) {
-                                                       
LOG.debug("Notification=" + getPrintableEntityNotification(notification));
-                                               }
-
-                                               ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification);
-                                               if (serviceTags == null) {
-                                                       LOG.error("Failed to 
create ServiceTags for notification :" + 
getPrintableEntityNotification(notification));
+                       while (true) {
+                               try {
+                                       if (hasNext()) {
+                                               EntityNotification notification 
= consumer.peek();
+                                               if (notification != null) {
+                                                       if 
(LOG.isDebugEnabled()) {
+                                                               
LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+                                                       }
+
+                                                       ServiceTags serviceTags 
= AtlasNotificationMapper.processEntityNotification(notification);
+                                                       if (serviceTags == 
null) {
+                                                               
LOG.error("Failed to create ServiceTags for notification :" + 
getPrintableEntityNotification(notification));
+                                                       } else {
+                                                               
updateSink(serviceTags);
+                                                       }
                                                } else {
-                                                       updateSink(serviceTags);
+                                                       LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
                                                }
+                                               // Move iterator forward
+                                               consumer.next();
                                        }
+                               } catch (Exception exception) {
+                                       LOG.error("Caught exception..: ", 
exception);
+                                       return;
                                }
                        }
-                       LOG.info("Shutting down the Tag-Atlas-source thread");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 9d9f25d..11ca2d8 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -46,6 +46,8 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
        private String atlasEndpoint;
        private long sleepTimeBetweenCycleInMillis;
 
+       private Thread myThread = null;
+
        public static void main(String[] args) {
 
                AtlasRESTTagSource atlasRESTTagSource = new 
AtlasRESTTagSource();
@@ -60,9 +62,19 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
                if (tagSink != null) {
 
-                       atlasRESTTagSource.initialize(props);
-                       atlasRESTTagSource.setTagSink(tagSink);
-                       atlasRESTTagSource.synchUp();
+                       if (atlasRESTTagSource.initialize(props)) {
+                               try {
+                                       tagSink.start();
+                                       atlasRESTTagSource.setTagSink(tagSink);
+                                       atlasRESTTagSource.synchUp();
+                               } catch (Exception exception) {
+                                       LOG.error("ServiceTags upload failed : 
", exception);
+                                       System.exit(1);
+                               }
+                       } else {
+                               LOG.error("AtlasRESTTagSource initialized 
failed, exiting.");
+                               System.exit(1);
+                       }
 
                } else {
                        LOG.error("TagSink initialialization failed, exiting.");
@@ -96,11 +108,18 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
        @Override
        public boolean start() {
 
-               Thread atlasRESTInvokerThread = new Thread(this);
-               atlasRESTInvokerThread.setDaemon(true);
-               atlasRESTInvokerThread.start();
+               myThread = new Thread(this);
+               myThread.setDaemon(true);
+               myThread.start();
 
-               return atlasRESTInvokerThread != null;
+               return true;
+       }
+
+       @Override
+       public void stop() {
+               if (myThread != null && myThread.isAlive()) {
+                       myThread.interrupt();
+               }
        }
 
        @Override
@@ -110,7 +129,7 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
                        LOG.debug("==> AtlasRESTTagSource.run()");
                }
 
-               while (!shutdown) {
+               while (true) {
 
                        synchUp();
 
@@ -121,18 +140,12 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
                                Thread.sleep(sleepTimeBetweenCycleInMillis);
 
                        } catch (InterruptedException exception) {
-                               LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to 
tagFileSource", exception);
+                               LOG.error("Interrupted..: ", exception);
+                               return;
                        }
                }
-               LOG.info("Shutting down the Tag-Atlasrest-source thread");
-       }
-
-       @Override
-       public boolean isChanged() {
-               return true;
        }
 
-       @Override
        public void synchUp() {
 
                AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint);

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
index 7f4676a..d7f983e 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
@@ -27,6 +27,7 @@ import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.ranger.admin.client.datatype.RESTResponse;
 import org.apache.ranger.plugin.util.RangerRESTClient;
@@ -114,43 +115,46 @@ public class AtlasRESTUtil {
 
                                        Map<String, Object> entityResponse = 
atlasAPI(API_ATLAS_ENTITY + guid);
 
-                                       if (MapUtils.isNotEmpty(entityResponse) 
&& entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+                                       Map<String, Object> definition = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
 
-                                               Map<String, Object> definition 
= getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
-                                               Map<String, Object> 
traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+                                       Map<String, Object> traitsAttribute = 
getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
 
-                                               if 
(MapUtils.isNotEmpty(traitsAttribute)) {
+                                       if 
(MapUtils.isNotEmpty(traitsAttribute)) {
 
-                                                       List<IStruct> allTraits 
= new LinkedList<>();
+                                               List<IStruct> allTraits = new 
LinkedList<>();
 
-                                                       for (Map.Entry<String, 
Object> entry : traitsAttribute.entrySet()) {
+                                               for (Map.Entry<String, Object> 
entry : traitsAttribute.entrySet()) {
 
-                                                               Map<String, 
Object> trait = (Map<String, Object>) entry.getValue();
+                                                       Map<String, Object> 
trait = (Map<String, Object>) entry.getValue();
 
-                                                               Map<String, 
Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-                                                               String 
traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+                                                       Map<String, Object> 
traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+                                                       String traitTypeName = 
getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
 
-                                                               List<IStruct> 
superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+                                                       if 
(StringUtils.isEmpty(traitTypeName)) {
+                                                               continue;
+                                                       }
 
-                                                               Struct trait1 = 
new Struct(traitTypeName, traitValues);
+                                                       List<IStruct> 
superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
 
-                                                               
allTraits.add(trait1);
-                                                               
allTraits.addAll(superTypes);
-                                                       }
+                                                       Struct trait1 = new 
Struct(traitTypeName, traitValues);
+
+                                                       allTraits.add(trait1);
+                                                       
allTraits.addAll(superTypes);
+                                               }
 
-                                                       IReferenceableInstance 
entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), 
true);
+                                               IReferenceableInstance entity = 
InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
 
-                                                       if (entity != null) {
-                                                               
AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, 
allTraits);
-                                                               
ret.add(entityWithTraits);
-                                                       } else {
-                                                               if 
(LOG.isInfoEnabled()) {
-                                                                       
LOG.info("Could not create Atlas entity from its definition, type=" + type + ", 
guid=" + guid);
-                                                               }
+                                               if (entity != null) {
+                                                       AtlasEntityWithTraits 
entityWithTraits = new AtlasEntityWithTraits(entity, allTraits);
+                                                       
ret.add(entityWithTraits);
+                                               } else {
+                                                       if 
(LOG.isInfoEnabled()) {
+                                                               LOG.info("Could 
not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
                                                        }
-
                                                }
+
                                        }
+
                                }
                        }
                        if (LOG.isDebugEnabled()) {
@@ -169,15 +173,12 @@ public class AtlasRESTUtil {
 
                Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + 
traitName);
 
-               if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-
-                       Map<String, Object> definition = 
getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
+               Map<String, Object> definition = getAttribute(typeResponse, 
DEFINITION_ATTRIBUTE, Map.class);
 
-                       List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
+               List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
 
-                       if (traitTypes.size() > 0) {
-                               ret = (Map<String, Object>) traitTypes.get(0);
-                       }
+               if (CollectionUtils.isNotEmpty(traitTypes)) {
+                       ret = (Map<String, Object>) traitTypes.get(0);
                }
 
                if (LOG.isDebugEnabled()) {
@@ -197,28 +198,30 @@ public class AtlasRESTUtil {
 
                        List<String> superTypeNames = getAttribute(traitType, 
SUPER_TYPES_ATTRIBUTE, List.class);
 
-                       for (String superTypeName : superTypeNames) {
+                       if (CollectionUtils.isNotEmpty(superTypeNames)) {
+                               for (String superTypeName : superTypeNames) {
 
-                               Map<String, Object> superTraitType = 
getTraitType(superTypeName);
+                                       Map<String, Object> superTraitType = 
getTraitType(superTypeName);
 
-                               if (superTraitType != null) {
-                                       List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+                                       if (superTraitType != null) {
+                                               List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
 
-                                       Map<String, Object> superTypeValues = 
new HashMap<>();
-                                       for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
+                                               Map<String, Object> 
superTypeValues = new HashMap<>();
+                                               for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
 
-                                               String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
-                                               if 
(values.containsKey(attributeName)) {
-                                                       
superTypeValues.put(attributeName, values.get(attributeName));
+                                                       String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
+                                                       if 
(values.containsKey(attributeName)) {
+                                                               
superTypeValues.put(attributeName, values.get(attributeName));
+                                                       }
                                                }
-                                       }
 
-                                       List<IStruct> superTraits = 
getTraitSuperTypes(getTraitType(superTypeName), values);
+                                               List<IStruct> superTraits = 
getTraitSuperTypes(getTraitType(superTypeName), values);
 
-                                       Struct superTrait = new 
Struct(superTypeName, superTypeValues);
+                                               Struct superTrait = new 
Struct(superTypeName, superTypeValues);
 
-                                       ret.add(superTrait);
-                                       ret.addAll(superTraits);
+                                               ret.add(superTrait);
+                                               ret.addAll(superTraits);
+                                       }
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index e339d5e..e22681e 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -49,6 +49,8 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
        private Properties properties;
        private long fileModTimeCheckIntervalInMs;
 
+       private Thread myThread = null;
+
        public static void main(String[] args) {
 
                FileTagSource fileTagSource = new FileTagSource();
@@ -69,9 +71,19 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
 
                if (tagSink != null) {
 
-                       fileTagSource.initialize(props);
-                       fileTagSource.setTagSink(tagSink);
-                       fileTagSource.synchUp();
+                       if (fileTagSource.initialize(props)) {
+                               try {
+                                       tagSink.start();
+                                       fileTagSource.setTagSink(tagSink);
+                                       fileTagSource.synchUp();
+                               } catch (Exception exception) {
+                                       LOG.error("ServiceTags upload failed : 
", exception);
+                                       System.exit(1);
+                               }
+                       } else {
+                               LOG.error("FileTagSource initialized failed, 
exiting.");
+                               System.exit(1);
+                       }
 
                } else {
                        LOG.error("TagSink initialialization failed, exiting.");
@@ -177,11 +189,18 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
        @Override
        public boolean start() {
 
-               Thread fileMonitoringThread = new Thread(this);
-               fileMonitoringThread.setDaemon(true);
-               fileMonitoringThread.start();
+               myThread = new Thread(this);
+               myThread.setDaemon(true);
+               myThread.start();
+
+               return true;
+       }
 
-               return fileMonitoringThread != null;
+       @Override
+       public void stop() {
+               if (myThread != null && myThread.isAlive()) {
+                       myThread.interrupt();
+               }
        }
 
        @Override
@@ -190,7 +209,7 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
                        LOG.debug("==> FileTagSource.run()");
                }
 
-               while (!shutdown) {
+               while (true) {
 
                        try {
                                synchUp();
@@ -199,23 +218,14 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
 
                                Thread.sleep(fileModTimeCheckIntervalInMs);
                        }
-                       catch (InterruptedException e) {
-                               LOG.error("Failed to wait for [" + 
fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to 
tagFileSource", e);
-                       }
-                       catch (Throwable t) {
-                               LOG.error("tag-sync thread got an error", t);
+                       catch (InterruptedException exception) {
+                               LOG.error("Interrupted..: ", exception);
+                               return;
                        }
                }
-
-               LOG.info("Shutting down the Tag-file-source thread");
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== FileTagSource.run()");
-               }
        }
 
-       @Override
-       public  boolean isChanged() {
+       private boolean isChanged() {
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> FileTagSource.isChanged()");
@@ -240,7 +250,6 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
                return ret;
        }
 
-       @Override
        public void synchUp() {
                if (isChanged()) {
                        if (LOG.isDebugEnabled()) {

Reply via email to