Repository: incubator-ranger Updated Branches: refs/heads/master d40a02054 -> b1b0fb16c
RANGER-838 : Tag-sync should be resilient to Ranger Admin availability Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/b1b0fb16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/b1b0fb16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/b1b0fb16 Branch: refs/heads/master Commit: b1b0fb16cd4197a67a61d8a1db2c40863739327c Parents: d40a020 Author: Abhay Kulkarni <[email protected]> Authored: Wed Jan 27 18:46:49 2016 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Mar 1 18:48:45 2016 -0800 ---------------------------------------------------------------------- .../ranger/tagsync/model/AbstractTagSource.java | 25 +-- .../apache/ranger/tagsync/model/TagSink.java | 4 +- .../apache/ranger/tagsync/model/TagSource.java | 5 +- .../ranger/tagsync/process/TagSyncConfig.java | 17 ++ .../ranger/tagsync/process/TagSynchronizer.java | 55 +++--- .../tagsync/sink/tagadmin/TagAdminRESTSink.java | 170 ++++++++++++++----- .../tagsync/source/atlas/AtlasTagSource.java | 50 +++--- .../source/atlasrest/AtlasRESTTagSource.java | 45 +++-- .../tagsync/source/atlasrest/AtlasRESTUtil.java | 91 +++++----- .../tagsync/source/file/FileTagSource.java | 53 +++--- 10 files changed, 331 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java index d6baeb2..d46170a 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java @@ -27,7 +27,6 @@ import org.apache.ranger.plugin.util.ServiceTags; public abstract class AbstractTagSource implements TagSource { private static final Log LOG = LogFactory.getLog(AbstractTagSource.class); private TagSink tagSink; - protected boolean shutdown = false; @Override public void setTagSink(TagSink sink) { @@ -37,29 +36,31 @@ public abstract class AbstractTagSource implements TagSource { this.tagSink = sink; } } - @Override - public void synchUp() {} - public void updateSink(final ServiceTags serviceTags) { - if (serviceTags == null) { + protected void updateSink(final ServiceTags toUpload) { + if (toUpload == null) { if (LOG.isDebugEnabled()) { LOG.debug("No ServiceTags to upload"); } } else { if (LOG.isDebugEnabled()) { - String serviceTagsJSON = new Gson().toJson(serviceTags); - LOG.debug("Uploading serviceTags=" + serviceTagsJSON); + String toUploadJSON = new Gson().toJson(toUpload); + LOG.debug("Uploading serviceTags=" + toUploadJSON); } try { - tagSink.uploadServiceTags(serviceTags); + ServiceTags uploaded = tagSink.upload(toUpload); + + if (LOG.isDebugEnabled()) { + String uploadedJSON = new Gson().toJson(uploaded); + LOG.debug("Uploaded serviceTags=" + uploadedJSON); + } } catch (Exception exception) { - LOG.error("uploadServiceTags() failed..", exception); + String toUploadJSON = new Gson().toJson(toUpload); + LOG.error("Failed to upload serviceTags: " + toUploadJSON); + LOG.error("Exception : ", exception); } } } - public void stop() { - shutdown = true; - } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java index 9eb5319..ae66e60 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java @@ -26,5 +26,7 @@ import java.util.Properties; public interface TagSink { boolean initialize(Properties properties); - void uploadServiceTags(ServiceTags serviceTags) throws Exception; + ServiceTags upload(ServiceTags toUpload) throws Exception; + boolean start(); + void stop(); } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java index 7d19562..5ef6c57 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java @@ -19,6 +19,7 @@ package org.apache.ranger.tagsync.model; + import java.util.Properties; public interface TagSource { @@ -27,12 +28,8 @@ public interface TagSource { void setTagSink(TagSink sink); - void synchUp(); - boolean start(); void stop(); - boolean isChanged(); - } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java index ed8679a..9588d66 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java @@ -72,6 +72,10 @@ public class TagSyncConfig extends Configuration { private static final long DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL = 900000; + private static final String TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP = "ranger.tagsync.tagadmin.connection.check.interval"; + + private static final int DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL = 2000; + private Properties props; public static TagSyncConfig getInstance() { @@ -286,6 +290,19 @@ public class TagSyncConfig extends Configuration { return prop.getProperty(TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP); } + static public long getTagAdminConnectionCheckInterval(Properties prop) { + long ret = DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL; + String val = prop.getProperty(TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP); + if (StringUtils.isNotBlank(val)) { + try { + ret = Long.valueOf(val); + } catch (NumberFormatException exception) { + // Ignore + } + } + return ret; + } + private TagSyncConfig() { super(false); init() ; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java index 696ce5e..5155681 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java @@ -32,10 +32,12 @@ public class TagSynchronizer { private static final Logger LOG = Logger.getLogger(TagSynchronizer.class); - private boolean shutdownFlag = false; + private TagSink tagSink = null; private List<TagSource> tagSources; private Properties properties = null; + private final Object shutdownNotifier = new Object(); + public static void main(String[] args) { TagSynchronizer tagSynchronizer = new TagSynchronizer(); @@ -49,23 +51,28 @@ public class TagSynchronizer { boolean tagSynchronizerInitialized = tagSynchronizer.initialize(); if (tagSynchronizerInitialized) { - tagSynchronizer.run(); + try { + tagSynchronizer.run(); + } catch (Throwable t) { + LOG.error("main thread caught exception..:", t); + System.exit(1); + } } else { LOG.error("TagSynchronizer failed to initialize correctly, exiting.."); - System.exit(-1); + System.exit(1); } } - public TagSynchronizer() { - setProperties(null); + TagSynchronizer() { + this(null); } - public TagSynchronizer(Properties properties) { + TagSynchronizer(Properties properties) { setProperties(properties); } - public void setProperties(Properties properties) { + void setProperties(Properties properties) { if (properties == null || MapUtils.isEmpty(properties)) { this.properties = new Properties(); } else { @@ -89,7 +96,7 @@ public class TagSynchronizer { LOG.info("Initializing TAG source and sink"); - TagSink tagSink = initializeTagSink(properties); + tagSink = initializeTagSink(properties); if (tagSink != null) { @@ -113,39 +120,32 @@ public class TagSynchronizer { return ret; } - public void run() { + public void run() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("==> TagSynchronizer.run()"); } - long shutdownCheckIntervalInMs = 60*1000; - - boolean tagSourcesStarted = true; - try { + boolean threadsStarted = tagSink.start(); + for (TagSource tagSource : tagSources) { - tagSourcesStarted = tagSourcesStarted && tagSource.start(); + threadsStarted = threadsStarted && tagSource.start(); } - if (tagSourcesStarted) { - while (!shutdownFlag) { - try { - LOG.debug("Sleeping for [" + shutdownCheckIntervalInMs + "] milliSeconds"); - Thread.sleep(shutdownCheckIntervalInMs); - } catch (InterruptedException e) { - LOG.error("Failed to wait for [" + shutdownCheckIntervalInMs + "] milliseconds before attempting to synchronize tag information ", e); - break; - } + if (threadsStarted) { + synchronized(shutdownNotifier) { + shutdownNotifier.wait(); } } - } catch (Throwable t) { - LOG.error("tag-sync main thread got an error", t); } finally { LOG.info("Stopping all tagSources"); for (TagSource tagSource : tagSources) { tagSource.stop(); } + + LOG.info("Stopping tagSink"); + tagSink.stop(); } if (LOG.isDebugEnabled()) { @@ -155,7 +155,10 @@ public class TagSynchronizer { public void shutdown(String reason) { LOG.info("Received shutdown(), reason=" + reason); - this.shutdownFlag = true; + + synchronized(shutdownNotifier) { + shutdownNotifier.notifyAll(); + } } static public void printConfigurationProperties(Properties properties) { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java index 1541034..c296b49 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java @@ -33,10 +33,15 @@ import org.apache.ranger.plugin.util.SearchFilter; import org.apache.ranger.plugin.util.ServiceTags; import org.apache.ranger.tagsync.process.TagSyncConfig; +import javax.servlet.http.HttpServletResponse; import java.util.Map; import java.util.Properties; -public class TagAdminRESTSink implements TagSink { +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TagAdminRESTSink implements TagSink, Runnable { private static final Log LOG = LogFactory.getLog(TagAdminRESTSink.class); private static final String REST_PREFIX = "/service"; @@ -46,45 +51,43 @@ public class TagAdminRESTSink implements TagSink { private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/"; + private long rangerAdminConnectionCheckInterval; + private RangerRESTClient tagRESTClient = null; + private BlockingQueue<UploadWorkItem> uploadWorkItems; + + private Thread myThread = null; + @Override public boolean initialize(Properties properties) { if(LOG.isDebugEnabled()) { LOG.debug("==> TagAdminRESTSink.initialize()"); } - boolean ret = true; + boolean ret = false; String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties); String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties); String userName = TagSyncConfig.getTagAdminUserName(properties); String password = TagSyncConfig.getTagAdminPassword(properties); + rangerAdminConnectionCheckInterval = TagSyncConfig.getTagAdminConnectionCheckInterval(properties); if (LOG.isDebugEnabled()) { LOG.debug("restUrl=" + restUrl); LOG.debug("sslConfigFile=" + sslConfigFile); LOG.debug("userName=" + userName); + LOG.debug("rangerAdminConnectionCheckInterval" + rangerAdminConnectionCheckInterval); } - if (StringUtils.isBlank(restUrl)) { - ret = false; - LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("ranger.tagsync.tagadmin.rest.url:" + restUrl); - } - } - - if (ret) { + if (StringUtils.isNotBlank(restUrl)) { tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile); tagRESTClient.setBasicAuthInfo(userName, password); + uploadWorkItems = new LinkedBlockingQueue<UploadWorkItem>(); - ret = testConnection(); - } - - if (!ret) { - LOG.error("Cannot connect to Tag Admin. Please recheck configuration properties and/or check if Tag Admin is running and responsive"); + ret = true; + } else { + LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!"); } if(LOG.isDebugEnabled()) { @@ -94,51 +97,53 @@ public class TagAdminRESTSink implements TagSink { return ret; } - public boolean testConnection() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAdminRESTSink.testConnection()"); + @Override + public ServiceTags upload(ServiceTags toUpload) throws Exception { + + if(LOG.isDebugEnabled()) { + LOG.debug("==> upload() "); } - boolean ret = true; + UploadWorkItem uploadWorkItem = new UploadWorkItem(toUpload); - try { - // build a dummy serviceTags structure and upload it to test connectivity - ServiceTags serviceTags = new ServiceTags(); - serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE); - uploadServiceTags(serviceTags); - } catch (Exception exception) { - LOG.error("test-upload of serviceTags failed.", exception); - ret = false; - } + uploadWorkItems.put(uploadWorkItem); - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAdminRESTSink.testConnection(), result=" + ret); + // Wait until message is successfully delivered + ServiceTags ret = uploadWorkItem.waitForUpload(); + + if(LOG.isDebugEnabled()) { + LOG.debug("<== upload()"); } + return ret; } - @Override - synchronized public void uploadServiceTags(ServiceTags serviceTags) throws Exception { + private ServiceTags doUpload(ServiceTags serviceTags) throws Exception { if(LOG.isDebugEnabled()) { - LOG.debug("==> uploadServiceTags()"); + LOG.debug("==> doUpload()"); } WebResource webResource = createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE); ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags)); - if(response == null || response.getStatus() != 204) { - LOG.error("RangerAdmin REST call returned with response={" + response + "}"); + if(response == null || response.getStatus() != HttpServletResponse.SC_NO_CONTENT) { RESTResponse resp = RESTResponse.fromClientResponse(response); LOG.error("Upload of service-tags failed with message " + resp.getMessage()); - throw new Exception("Upload of service-tags failed with response: " + response); + if (response == null || response.getStatus() != HttpServletResponse.SC_BAD_REQUEST) { + // NOT an application error + throw new Exception("Upload of service-tags failed with response: " + response); + } + } if(LOG.isDebugEnabled()) { - LOG.debug("<== uploadServiceTags()"); + LOG.debug("<== doUpload()"); } + + return serviceTags; } private WebResource createWebResource(String url) { @@ -159,4 +164,91 @@ public class TagAdminRESTSink implements TagSink { return ret; } + + @Override + public boolean start() { + + myThread = new Thread(this); + myThread.setDaemon(true); + myThread.start(); + + return true; + } + + @Override + public void stop() { + if (myThread != null && myThread.isAlive()) { + myThread.interrupt(); + } + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAdminRESTSink.run()"); + } + + while (true) { + UploadWorkItem uploadWorkItem; + + try { + uploadWorkItem = uploadWorkItems.take(); + + ServiceTags toUpload = uploadWorkItem.getServiceTags(); + + boolean doRetry; + + do { + doRetry = false; + + try { + ServiceTags uploaded = doUpload(toUpload); + // ServiceTags uploaded successfully + uploadWorkItem.uploadCompleted(uploaded); + } catch (InterruptedException interrupted) { + LOG.error("Caught exception..: ", interrupted); + return; + } catch (Exception exception) { + doRetry = true; + Thread.sleep(rangerAdminConnectionCheckInterval); + } + } while (doRetry); + + } + catch (InterruptedException exception) { + LOG.error("Interrupted..: ", exception); + return; + } + } + + } + + class UploadWorkItem { + private ServiceTags serviceTags; + private BlockingQueue<ServiceTags> uploadedServiceTags; + + ServiceTags getServiceTags() { + return serviceTags; + } + + ServiceTags waitForUpload() throws InterruptedException { + return uploadedServiceTags.take(); + } + + void uploadCompleted(ServiceTags uploaded) throws InterruptedException { + // ServiceTags uploaded successfully + uploadedServiceTags.put(uploaded); + } + + UploadWorkItem(ServiceTags serviceTags) { + setServiceTags(serviceTags); + uploadedServiceTags = new ArrayBlockingQueue<ServiceTags>(1); + } + + void setServiceTags(ServiceTags serviceTags) { + this.serviceTags = serviceTags; + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 2499177..49d6f61 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -50,6 +50,7 @@ public class AtlasTagSource extends AbstractTagSource { public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; private ConsumerRunnable consumerTask; + private Thread myThread = null; @Override public boolean initialize(Properties properties) { @@ -123,23 +124,24 @@ public class AtlasTagSource extends AbstractTagSource { if (LOG.isDebugEnabled()) { LOG.debug("==> AtlasTagSource.start()"); } - Thread consumerThread = null; if (consumerTask == null) { LOG.error("No consumerTask!!!"); } else { - consumerThread = new Thread(consumerTask); - consumerThread.setDaemon(true); - consumerThread.start(); + myThread = new Thread(consumerTask); + myThread.setDaemon(true); + myThread.start(); } if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTagSource.start()"); } - return consumerThread != null; + return myThread != null; } @Override - public boolean isChanged() { - return true; + public void stop() { + if (myThread != null && myThread.isAlive()) { + myThread.interrupt(); + } } private static String getPrintableEntityNotification(EntityNotification notification) { @@ -175,24 +177,32 @@ public class AtlasTagSource extends AbstractTagSource { if (LOG.isDebugEnabled()) { LOG.debug("==> ConsumerRunnable.run()"); } - while (!shutdown) { - if (hasNext()) { - EntityNotification notification = consumer.next(); - if (notification != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Notification=" + getPrintableEntityNotification(notification)); - } - - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); - if (serviceTags == null) { - LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification)); + while (true) { + try { + if (hasNext()) { + EntityNotification notification = consumer.peek(); + if (notification != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notification=" + getPrintableEntityNotification(notification)); + } + + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); + if (serviceTags == null) { + LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification)); + } else { + updateSink(serviceTags); + } } else { - updateSink(serviceTags); + LOG.error("Null entityNotification received from Kafka!! Ignoring.."); } + // Move iterator forward + consumer.next(); } + } catch (Exception exception) { + LOG.error("Caught exception..: ", exception); + return; } } - LOG.info("Shutting down the Tag-Atlas-source thread"); } } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java index 9d9f25d..11ca2d8 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java @@ -46,6 +46,8 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { private String atlasEndpoint; private long sleepTimeBetweenCycleInMillis; + private Thread myThread = null; + public static void main(String[] args) { AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource(); @@ -60,9 +62,19 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { if (tagSink != null) { - atlasRESTTagSource.initialize(props); - atlasRESTTagSource.setTagSink(tagSink); - atlasRESTTagSource.synchUp(); + if (atlasRESTTagSource.initialize(props)) { + try { + tagSink.start(); + atlasRESTTagSource.setTagSink(tagSink); + atlasRESTTagSource.synchUp(); + } catch (Exception exception) { + LOG.error("ServiceTags upload failed : ", exception); + System.exit(1); + } + } else { + LOG.error("AtlasRESTTagSource initialized failed, exiting."); + System.exit(1); + } } else { LOG.error("TagSink initialialization failed, exiting."); @@ -96,11 +108,18 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { @Override public boolean start() { - Thread atlasRESTInvokerThread = new Thread(this); - atlasRESTInvokerThread.setDaemon(true); - atlasRESTInvokerThread.start(); + myThread = new Thread(this); + myThread.setDaemon(true); + myThread.start(); - return atlasRESTInvokerThread != null; + return true; + } + + @Override + public void stop() { + if (myThread != null && myThread.isAlive()) { + myThread.interrupt(); + } } @Override @@ -110,7 +129,7 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { LOG.debug("==> AtlasRESTTagSource.run()"); } - while (!shutdown) { + while (true) { synchUp(); @@ -121,18 +140,12 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { Thread.sleep(sleepTimeBetweenCycleInMillis); } catch (InterruptedException exception) { - LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", exception); + LOG.error("Interrupted..: ", exception); + return; } } - LOG.info("Shutting down the Tag-Atlasrest-source thread"); - } - - @Override - public boolean isChanged() { - return true; } - @Override public void synchUp() { AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java index 7f4676a..d7f983e 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java @@ -27,6 +27,7 @@ import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.ranger.admin.client.datatype.RESTResponse; import org.apache.ranger.plugin.util.RangerRESTClient; @@ -114,43 +115,46 @@ public class AtlasRESTUtil { Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); - if (MapUtils.isNotEmpty(entityResponse) && entityResponse.containsKey(DEFINITION_ATTRIBUTE)) { + Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class); - Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class); - Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); + Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); - if (MapUtils.isNotEmpty(traitsAttribute)) { + if (MapUtils.isNotEmpty(traitsAttribute)) { - List<IStruct> allTraits = new LinkedList<>(); + List<IStruct> allTraits = new LinkedList<>(); - for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) { + for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) { - Map<String, Object> trait = (Map<String, Object>) entry.getValue(); + Map<String, Object> trait = (Map<String, Object>) entry.getValue(); - Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); - String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); + Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); + String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); - List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); + if (StringUtils.isEmpty(traitTypeName)) { + continue; + } - Struct trait1 = new Struct(traitTypeName, traitValues); + List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); - allTraits.add(trait1); - allTraits.addAll(superTypes); - } + Struct trait1 = new Struct(traitTypeName, traitValues); + + allTraits.add(trait1); + allTraits.addAll(superTypes); + } - IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true); + IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true); - if (entity != null) { - AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits); - ret.add(entityWithTraits); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid); - } + if (entity != null) { + AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits); + ret.add(entityWithTraits); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid); } - } + } + } } if (LOG.isDebugEnabled()) { @@ -169,15 +173,12 @@ public class AtlasRESTUtil { Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); - if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) { - - Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class); + Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class); - List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); + List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); - if (traitTypes.size() > 0) { - ret = (Map<String, Object>) traitTypes.get(0); - } + if (CollectionUtils.isNotEmpty(traitTypes)) { + ret = (Map<String, Object>) traitTypes.get(0); } if (LOG.isDebugEnabled()) { @@ -197,28 +198,30 @@ public class AtlasRESTUtil { List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); - for (String superTypeName : superTypeNames) { + if (CollectionUtils.isNotEmpty(superTypeNames)) { + for (String superTypeName : superTypeNames) { - Map<String, Object> superTraitType = getTraitType(superTypeName); + Map<String, Object> superTraitType = getTraitType(superTypeName); - if (superTraitType != null) { - List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); + if (superTraitType != null) { + List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); - Map<String, Object> superTypeValues = new HashMap<>(); - for (Map<String, Object> attributeDefinition : attributeDefinitions) { + Map<String, Object> superTypeValues = new HashMap<>(); + for (Map<String, Object> attributeDefinition : attributeDefinitions) { - String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); - if (values.containsKey(attributeName)) { - superTypeValues.put(attributeName, values.get(attributeName)); + String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); + if (values.containsKey(attributeName)) { + superTypeValues.put(attributeName, values.get(attributeName)); + } } - } - List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values); + List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values); - Struct superTrait = new Struct(superTypeName, superTypeValues); + Struct superTrait = new Struct(superTypeName, superTypeValues); - ret.add(superTrait); - ret.addAll(superTraits); + ret.add(superTrait); + ret.addAll(superTraits); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java index e339d5e..e22681e 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java @@ -49,6 +49,8 @@ public class FileTagSource extends AbstractTagSource implements Runnable { private Properties properties; private long fileModTimeCheckIntervalInMs; + private Thread myThread = null; + public static void main(String[] args) { FileTagSource fileTagSource = new FileTagSource(); @@ -69,9 +71,19 @@ public class FileTagSource extends AbstractTagSource implements Runnable { if (tagSink != null) { - fileTagSource.initialize(props); - fileTagSource.setTagSink(tagSink); - fileTagSource.synchUp(); + if (fileTagSource.initialize(props)) { + try { + tagSink.start(); + fileTagSource.setTagSink(tagSink); + fileTagSource.synchUp(); + } catch (Exception exception) { + LOG.error("ServiceTags upload failed : ", exception); + System.exit(1); + } + } else { + LOG.error("FileTagSource initialized failed, exiting."); + System.exit(1); + } } else { LOG.error("TagSink initialialization failed, exiting."); @@ -177,11 +189,18 @@ public class FileTagSource extends AbstractTagSource implements Runnable { @Override public boolean start() { - Thread fileMonitoringThread = new Thread(this); - fileMonitoringThread.setDaemon(true); - fileMonitoringThread.start(); + myThread = new Thread(this); + myThread.setDaemon(true); + myThread.start(); + + return true; + } - return fileMonitoringThread != null; + @Override + public void stop() { + if (myThread != null && myThread.isAlive()) { + myThread.interrupt(); + } } @Override @@ -190,7 +209,7 @@ public class FileTagSource extends AbstractTagSource implements Runnable { LOG.debug("==> FileTagSource.run()"); } - while (!shutdown) { + while (true) { try { synchUp(); @@ -199,23 +218,14 @@ public class FileTagSource extends AbstractTagSource implements Runnable { Thread.sleep(fileModTimeCheckIntervalInMs); } - catch (InterruptedException e) { - LOG.error("Failed to wait for [" + fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to tagFileSource", e); - } - catch (Throwable t) { - LOG.error("tag-sync thread got an error", t); + catch (InterruptedException exception) { + LOG.error("Interrupted..: ", exception); + return; } } - - LOG.info("Shutting down the Tag-file-source thread"); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== FileTagSource.run()"); - } } - @Override - public boolean isChanged() { + private boolean isChanged() { if (LOG.isDebugEnabled()) { LOG.debug("==> FileTagSource.isChanged()"); @@ -240,7 +250,6 @@ public class FileTagSource extends AbstractTagSource implements Runnable { return ret; } - @Override public void synchUp() { if (isChanged()) { if (LOG.isDebugEnabled()) {
