RANGER-807: TagSync should support periodic full sync with Apache Atlas Signed-off-by: Madhan Neethiraj <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/5b9c094f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/5b9c094f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/5b9c094f Branch: refs/heads/master Commit: 5b9c094ff501ec591f6334bcea10ce37a2163711 Parents: fec8460 Author: Abhay Kulkarni <[email protected]> Authored: Tue Dec 15 19:08:02 2015 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Jan 9 23:59:10 2016 -0800 ---------------------------------------------------------------------- .../ranger/plugin/util/RangerRESTClient.java | 3 +- .../apache/ranger/plugin/util/ServiceTags.java | 27 +- .../conf/templates/installprop2xml.properties | 6 +- .../conf/templates/ranger-tagsync-template.xml | 13 +- tagsync/scripts/install.properties | 12 +- tagsync/scripts/ranger-tagsync-services.sh | 6 +- tagsync/scripts/ranger-tagsync-sync.sh | 66 ++++ tagsync/scripts/setup.py | 19 +- .../ranger/tagsync/model/AbstractTagSource.java | 65 ++++ .../apache/ranger/tagsync/model/TagSource.java | 6 +- .../ranger/tagsync/process/TagSyncConfig.java | 68 ++-- .../ranger/tagsync/process/TagSynchronizer.java | 193 +++++++---- .../tagsync/sink/tagadmin/TagAdminRESTSink.java | 162 +++++++++ .../tagsync/sink/tagadmin/TagRESTSink.java | 160 --------- .../source/Atlas/AtlasHiveResourceMapper.java | 203 ++++++++++++ .../source/Atlas/AtlasResourceMapper.java | 74 +++++ .../source/Atlas/AtlasResourceMapperUtil.java | 124 +++++++ .../tagsync/source/Atlas/AtlasTagSource.java | 197 +++++++++++ .../source/atlas/AtlasEntityWithTraits.java | 96 ++++++ .../source/atlas/AtlasNotificationMapper.java | 327 ++++++------------- .../tagsync/source/atlas/TagAtlasSource.java | 241 -------------- .../source/atlasrest/AtlasRESTTagSource.java | 143 ++++++++ .../tagsync/source/atlasrest/AtlasRESTUtil.java | 264 +++++++++++++++ .../tagsync/source/file/FileTagSource.java | 278 ++++++++++++++++ .../tagsync/source/file/TagFileSource.java | 281 ---------------- .../main/resources/etc/ranger/data/tags.json | 26 +- .../main/resources/ranger-tagsync-default.xml | 5 + .../tagsync/process/TestTagSynchronizer.java | 22 +- 28 files changed, 2020 insertions(+), 1067 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java index c311670..7cfd040 100644 --- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java @@ -212,8 +212,7 @@ public class RangerRESTClient { client = Client.create(config); } - // TODO: for testing only - if(!StringUtils.isEmpty(mUsername) || !StringUtils.isEmpty(mPassword)) { + if(!StringUtils.isEmpty(mUsername) && !StringUtils.isEmpty(mPassword)) { client.addFilter(new HTTPBasicAuthFilter(mUsername, mPassword)); } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java index 7800e9a..c3fb4cf 100644 --- a/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java @@ -23,6 +23,8 @@ package org.apache.ranger.plugin.util; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -51,7 +53,6 @@ public class ServiceTags implements java.io.Serializable { public static final String TAGMODEL_SHARED = "shared"; public static final String TAGMODEL_RESOURCE_PRIVATE = "resource_private"; - private String op = OP_ADD_OR_UPDATE; private String tagModel = TAGMODEL_SHARED; private String serviceName; @@ -62,6 +63,22 @@ public class ServiceTags implements java.io.Serializable { private List<RangerServiceResource> serviceResources; private Map<Long, List<Long>> resourceToTagIds; + public ServiceTags() { + this(OP_ADD_OR_UPDATE, TAGMODEL_SHARED, null, 0L, null, null, null, null, null); + } + + public ServiceTags(String op, String tagModel, String serviceName, Long tagVersion, Date tagUpdateTime, Map<Long, RangerTagDef> tagDefinitions, + Map<Long, RangerTag> tags, List<RangerServiceResource> serviceResources, Map<Long, List<Long>> resourceToTagIds) { + setOp(op); + setTagModel(tagModel); + setServiceName(serviceName); + setTagVersion(tagVersion); + setTagUpdateTime(tagUpdateTime); + setTagDefinitions(tagDefinitions); + setTags(tags); + setServiceResources(serviceResources); + setResourceToTagIds(resourceToTagIds); + } /** * @return the op */ @@ -138,7 +155,7 @@ public class ServiceTags implements java.io.Serializable { } public void setTagDefinitions(Map<Long, RangerTagDef> tagDefinitions) { - this.tagDefinitions = tagDefinitions; + this.tagDefinitions = tagDefinitions == null ? new HashMap<Long, RangerTagDef>() : tagDefinitions; } public Map<Long, RangerTag> getTags() { @@ -146,7 +163,7 @@ public class ServiceTags implements java.io.Serializable { } public void setTags(Map<Long, RangerTag> tags) { - this.tags = tags; + this.tags = tags == null ? new HashMap<Long, RangerTag>() : tags; } public List<RangerServiceResource> getServiceResources() { @@ -154,7 +171,7 @@ public class ServiceTags implements java.io.Serializable { } public void setServiceResources(List<RangerServiceResource> serviceResources) { - this.serviceResources = serviceResources; + this.serviceResources = serviceResources == null ? new ArrayList<RangerServiceResource>() : serviceResources; } public Map<Long, List<Long>> getResourceToTagIds() { @@ -162,7 +179,7 @@ public class ServiceTags implements java.io.Serializable { } public void setResourceToTagIds(Map<Long, List<Long>> resourceToTagIds) { - this.resourceToTagIds = resourceToTagIds; + this.resourceToTagIds = resourceToTagIds == null ? new HashMap<Long, List<Long>>() : resourceToTagIds; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/conf/templates/installprop2xml.properties ---------------------------------------------------------------------- diff --git a/tagsync/conf/templates/installprop2xml.properties b/tagsync/conf/templates/installprop2xml.properties index 101a1ba..a6840b0 100644 --- a/tagsync/conf/templates/installprop2xml.properties +++ b/tagsync/conf/templates/installprop2xml.properties @@ -29,13 +29,15 @@ TAGADMIN_SSL_CONFIG_FILENAME = ranger.tagsync.tagadmin.rest.ssl.config.file TAGSYNC_KEYSTORE_FILENAME = ranger.tagsync.tagadmin.keystore - +TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = ranger.tagsync.filesource.modtime.check.interval -TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename +TAG_SOURCE_ATLAS_REST_URL = ranger.tagsync.atlasrestsource.endpoint +TAG_SOURCE_ATLAS_REST_DOWNLOAD_INTERVAL = ranger.tagsync.atlasrestsource.download.interval TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.kafka.bootstrap.servers TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.kafka.zookeeper.connect TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.kafka.entities.group.id TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING = ranger.tagsync.atlas.to.service.mapping +TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS = ranger.tagsync.source.atlas.custom.resource.mappers \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/conf/templates/ranger-tagsync-template.xml ---------------------------------------------------------------------- diff --git a/tagsync/conf/templates/ranger-tagsync-template.xml b/tagsync/conf/templates/ranger-tagsync-template.xml index 9a88681..bad71bd 100644 --- a/tagsync/conf/templates/ranger-tagsync-template.xml +++ b/tagsync/conf/templates/ranger-tagsync-template.xml @@ -32,6 +32,10 @@ <value></value> </property> <property> + <name>ranger.tagsync.atlasrestsource.download.interval</name> + <value></value> + </property> + <property> <name>ranger.tagsync.tagadmin.rest.ssl.config.file</name> <value></value> </property> @@ -59,4 +63,11 @@ <name>ranger.tagsync.atlas.to.service.mapping</name> <value></value> </property> -</configuration> + <property> + <name>ranger.tagsync.atlasrestsource.endpoint</name> + <value></value> + </property> + <property> + <name>ranger.tagsync.source.atlas.custom.resource.mappers</name> + <value></value> + </property></configuration> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/install.properties ---------------------------------------------------------------------- diff --git a/tagsync/scripts/install.properties b/tagsync/scripts/install.properties index 95e87e5..b6665d1 100644 --- a/tagsync/scripts/install.properties +++ b/tagsync/scripts/install.properties @@ -37,9 +37,13 @@ TAGADMIN_ENDPOINT = http://localhost:6080 # SSL config file name for TagAdmin TAGADMIN_SSL_CONFIG_FILENAME = -# Source for tags (either 'atlas' or 'file') +# Source for tags (either 'atlas' or 'file' or 'atlasrest') TAG_SOURCE = atlas +TAG_SOURCE_ATLAS_REST_URL = http://localhost:21000 + +# Interval for checking the source for any changes in case of TAG_SOURCE = atlasrest +TAG_SOURCE_ATLAS_REST_DOWNLOAD_INTERVAL = 900000 # File name to be used for reading tags information if TAG_SOURCE = file @@ -73,3 +77,9 @@ TAGSYNC_ATLAS_CONSUMER_GROUP = ranger_entities_consumer # TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING= + +# A comma separated list of custom mapper class names which convert Atlas entities to +# RangerServiceResource structures are specified here. If there are no custom mappers, +# then it can be left blank + +TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/ranger-tagsync-services.sh ---------------------------------------------------------------------- diff --git a/tagsync/scripts/ranger-tagsync-services.sh b/tagsync/scripts/ranger-tagsync-services.sh index ca82ead..058ebf3 100755 --- a/tagsync/scripts/ranger-tagsync-services.sh +++ b/tagsync/scripts/ranger-tagsync-services.sh @@ -16,8 +16,8 @@ # limitations under the License. if [[ -z $1 ]]; then - echo "Invalid argument [$1];" - echo "Usage: Only start | stop | restart | version, are supported." + echo "No argument provided.." + echo "Usage: $0 {start | stop | restart | version}" exit; fi action=$1 @@ -119,7 +119,7 @@ elif [ "${action}" == "VERSION" ]; then exit else echo "Invalid argument [$1];" - echo "Usage: Only start | stop | restart | version, are supported." + echo "Usage: $0 {start | stop | restart | version}" exit; fi http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/ranger-tagsync-sync.sh ---------------------------------------------------------------------- diff --git a/tagsync/scripts/ranger-tagsync-sync.sh b/tagsync/scripts/ranger-tagsync-sync.sh new file mode 100755 index 0000000..f8990e3 --- /dev/null +++ b/tagsync/scripts/ranger-tagsync-sync.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [[ -z $1 ]]; then + echo "No argument provided.." + echo "Usage: $0 {FILE | ATLAS_DOWNLOAD}" + exit -1; +fi +action=$1 +action=`echo $action | tr '[:lower:]' '[:upper:]'` +realScriptPath=`readlink -f $0` +realScriptDir=`dirname $realScriptPath` +cd $realScriptDir +cdir=`pwd` + +if [ "${action}" == "FILE" ]; then + action=file +elif [ "${action}" == "ATLAS_DOWNLOAD" ]; then + action=atlasrest +else + echo "Invalid argument [$action];" + echo "Usage: $0 {FILE | ATLAS_DOWNLOAD}" + exit -1; +fi + +if [ -f ${cdir}/conf/java_home.sh ]; then + . ${cdir}/conf/java_home.sh +fi + +for custom_env_script in `find ${cdir}/conf.dist/ -name "ranger-tagsync-env*"`; do + if [ -f $custom_env_script ]; then + . $custom_env_script + fi +done + +if [ "$JAVA_HOME" != "" ]; then + export PATH=$JAVA_HOME/bin:$PATH +fi + +logdir=/var/log/ranger/tagsync-$action + +if [ ! -d $logdir ]; then + mkdir -p $logdir + chmod 777 $logdir +fi + +cp="${cdir}/conf:${cdir}/dist/*:${cdir}/lib/*" + +cd ${cdir} +umask 0077 +java -Dproc_rangertagsync-$action ${JAVA_OPTS} -Dlogdir="${logdir}" -cp "${cp}" org.apache.ranger.tagsync.process.TagSynchronizer $action > ${logdir}/tagsync.out 2>&1 + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/setup.py ---------------------------------------------------------------------- diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py index 1c8552b..c100f73 100755 --- a/tagsync/scripts/setup.py +++ b/tagsync/scripts/setup.py @@ -77,8 +77,8 @@ TAGSYNC_INSTALL_PROP_PREFIX_FOR_ATLAS_RANGER_MAPPING = 'ranger.tagsync.atlas.' TAGSYNC_ATLAS_CLUSTER_IDENTIFIER = '.instance.' TAGSYNC_INSTALL_PROP_SUFFIX_FOR_ATLAS_RANGER_MAPPING = '.ranger.service' TAG_SOURCE_ATLAS = 'atlas' +TAG_SOURCE_ATLASREST = 'atlasrest' TAG_SOURCE_FILE = 'file' -TAG_SOURCE_LIST = [ TAG_SOURCE_ATLAS, TAG_SOURCE_FILE ] def archiveFile(originalFileName): archiveDir = dirname(originalFileName) @@ -199,20 +199,11 @@ def convertInstallPropsToXML(props): else: print "Direct Key not found:%s" % (k) - ret['ranger.tagsync.sink.impl.class'] = 'org.apache.ranger.sink.policymgr.TagRESTSink' + ret['ranger.tagsync.sink.impl.class'] = 'org.apache.ranger.sink.policymgr.TagAdminRESTSink' + if (TAG_SOURCE_KEY in ret): - tagSource = ret[TAG_SOURCE_KEY] - if (tagSource == TAG_SOURCE_ATLAS): - ret['ranger.tagsync.source.impl.class'] = 'atlas' - elif (tagSource == TAG_SOURCE_FILE): - ret['ranger.tagsync.source.impl.class'] = 'file' - else: - print "ERROR: Invalid value (%s) defined for %s in install.properties. Only valid values are %s" % (tagSource, TAG_SOURCE_ATLAS,TAG_SOURCE_FILE) - sys.exit(1) - del ret['TAG_SOURCE'] - else: - print "ERROR: No value defined for TAG_SOURCE in install.properties. valid values are %s" % (TAG_SOURCE_ATLAS, TAG_SOURCE_FILE) - sys.exit(1) + ret['ranger.tagsync.source.impl.class'] = ret[TAG_SOURCE_KEY] + del ret[TAG_SOURCE_KEY] atlasOutFile.close() http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java new file mode 100644 index 0000000..d6baeb2 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.model; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.plugin.util.ServiceTags; + +public abstract class AbstractTagSource implements TagSource { + private static final Log LOG = LogFactory.getLog(AbstractTagSource.class); + private TagSink tagSink; + protected boolean shutdown = false; + + @Override + public void setTagSink(TagSink sink) { + if (sink == null) { + LOG.error("Sink is null!!!"); + } else { + this.tagSink = sink; + } + } + @Override + public void synchUp() {} + + public void updateSink(final ServiceTags serviceTags) { + if (serviceTags == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No ServiceTags to upload"); + } + } else { + if (LOG.isDebugEnabled()) { + String serviceTagsJSON = new Gson().toJson(serviceTags); + LOG.debug("Uploading serviceTags=" + serviceTagsJSON); + } + + try { + tagSink.uploadServiceTags(serviceTags); + } catch (Exception exception) { + LOG.error("uploadServiceTags() failed..", exception); + } + } + } + + public void stop() { + shutdown = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java index 2df8036..7d19562 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java @@ -27,9 +27,11 @@ public interface TagSource { void setTagSink(TagSink sink); - void updateSink() throws Exception; + void synchUp(); - Thread start(); + boolean start(); + + void stop(); boolean isChanged(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java index e1b5130..0dfea25 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java @@ -47,22 +47,29 @@ public class TagSyncConfig extends Configuration { private static final String TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP = "ranger.tagsync.filesource.modtime.check.interval"; + private static final String TAGSYNC_ATLAS_REST_SOURCE_DOWNLOAD_INTERVAL_PROP = "ranger.tagsync.atlasrestsource.download.interval"; + private static final String TAGSYNC_SOURCE_CLASS_PROP = "ranger.tagsync.source.impl.class"; private static final String TAGSYNC_SINK_CLASS_PROP = "ranger.tagsync.sink.impl.class"; - private static final String TAGSYNC_ATLASSOURCE_ENDPOINT_PROP = "ranger.tagsync.atlassource.endpoint"; + private static final String TAGSYNC_ATLASSOURCE_ENDPOINT_PROP = "ranger.tagsync.atlasrestsource.endpoint"; - private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = "ranger.tagsync.atlas."; + public static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = "ranger.tagsync.atlas."; - private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = ".ranger.service"; + public static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = ".ranger.service"; - private static final String TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR = "_"; + public static final String TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR = "_"; private static final String TAGSYNC_TAGADMIN_KEYSTORE_PROP = "ranger.tagsync.tagadmin.keystore"; private static final String TAGSYNC_TAGADMIN_ALIAS_PROP = "ranger.tagsync.tagadmin.alias"; private static final String TAGSYNC_TAGADMIN_PASSWORD_PROP = "ranger.tagsync.tagadmin.password"; private static final String DEFAULT_TAGADMIN_USERNAME = "rangertagsync"; + private static final String TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP = "ranger.tagsync.source.atlas.custom.resource.mappers"; + + private static final long DEFAULT_TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL = 60000; + + private static final long DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL = 900000; public static TagSyncConfig getInstance() { TagSyncConfig newConfig = new TagSyncConfig(); @@ -158,7 +165,7 @@ public class TagSyncConfig extends Configuration { static public boolean isTagSyncEnabled(Properties prop) { String val = prop.getProperty(TAGSYNC_ENABLED_PROP); - return !(val != null && val.trim().equalsIgnoreCase("falae")); + return !(val != null && val.trim().equalsIgnoreCase("false")); } static public String getTagSyncLogdir(Properties prop) { @@ -168,17 +175,39 @@ public class TagSyncConfig extends Configuration { static public long getTagSourceFileModTimeCheckIntervalInMillis(Properties prop) { String val = prop.getProperty(TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP); - return Long.valueOf(val); + long ret = DEFAULT_TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL; + if (StringUtils.isNotBlank(val)) { + try { + ret = Long.valueOf(val); + } catch (NumberFormatException exception) { + // Ignore + } + } + return ret; + } + + static public long getTagSourceAtlasDownloadIntervalInMillis(Properties prop) { + String val = prop.getProperty(TAGSYNC_ATLAS_REST_SOURCE_DOWNLOAD_INTERVAL_PROP); + long ret = DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL; + if (StringUtils.isNotBlank(val)) { + try { + ret = Long.valueOf(val); + } catch (NumberFormatException exception) { + // Ignore + } + } + return ret; } - static public String getTagSourceClassName(Properties prop) { - String val = prop.getProperty(TAGSYNC_SOURCE_CLASS_PROP); - if (StringUtils.equalsIgnoreCase(val, "atlas")) { - return "org.apache.ranger.tagsync.source.atlas.TagAtlasSource"; - } else if (StringUtils.equalsIgnoreCase(val, "file")) { - return "org.apache.ranger.tagsync.source.file.TagFileSource"; + static public String getTagSourceClassName(String sourceName) { + if (StringUtils.equalsIgnoreCase(sourceName, "atlas")) { + return "org.apache.ranger.tagsync.source.atlas.AtlasTagSource"; + } else if (StringUtils.equalsIgnoreCase(sourceName, "file")) { + return "org.apache.ranger.tagsync.source.file.FileTagSource"; + } else if (StringUtils.equalsIgnoreCase(sourceName, "atlasrest")) { + return "org.apache.ranger.tagsync.source.atlasrest.AtlasRESTTagSource"; } else - return val; + return sourceName; } static public String getTagSource(Properties prop) { @@ -188,7 +217,7 @@ public class TagSyncConfig extends Configuration { static public String getTagSinkClassName(Properties prop) { String val = prop.getProperty(TAGSYNC_SINK_CLASS_PROP); if (StringUtils.equalsIgnoreCase(val, "tagadmin")) { - return "org.apache.ranger.tagsync.sink.tagadmin.TagRESTSink"; + return "org.apache.ranger.tagsync.sink.tagadmin.TagAdminRESTSink"; } else return val; } @@ -250,15 +279,8 @@ public class TagSyncConfig extends Configuration { return ""; } - static public String getServiceName(String componentName, String instanceName, Properties prop) { - String propName = TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName - + ".instance." + instanceName - + TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX; - String val = prop.getProperty(propName); - if (StringUtils.isBlank(val)) { - val = instanceName + TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + componentName; - } - return val; + static public String getCustomAtlasResourceMappers(Properties prop) { + return prop.getProperty(TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP); } private TagSyncConfig() { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java index 7bae973..c1a1c39 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java @@ -19,49 +19,50 @@ package org.apache.ranger.tagsync.process; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.ranger.tagsync.model.TagSink; import org.apache.ranger.tagsync.model.TagSource; -import java.util.Map; -import java.util.Properties; +import java.util.*; public class TagSynchronizer { private static final Logger LOG = Logger.getLogger(TagSynchronizer.class); private boolean shutdownFlag = false; - private TagSource tagSource = null; + private List<TagSource> tagSources; private Properties properties = null; public static void main(String[] args) { - boolean tagSynchronizerInitialized = false; TagSynchronizer tagSynchronizer = new TagSynchronizer(); - while (!tagSynchronizerInitialized) { + TagSyncConfig config = TagSyncConfig.getInstance(); + Properties props = config.getProperties(); - TagSyncConfig config = TagSyncConfig.getInstance(); - Properties props = config.getProperties(); + tagSynchronizer.setProperties(props); - tagSynchronizer.setProperties(props); - tagSynchronizerInitialized = tagSynchronizer.initialize(); + String tagSourceName = null; + if (args.length > 0) { + tagSourceName = args[0]; + LOG.info("TagSource is set to " + args[0]); + } - if (!tagSynchronizerInitialized) { - LOG.error("TagSynchronizer failed to initialize correctly"); + boolean tagSynchronizerInitialized = tagSynchronizer.initialize(tagSourceName); - try { - LOG.error("Sleeping for [60] seconds before attempting to re-read configuration XML files"); - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - LOG.error("Failed to wait for [60] seconds", e); - } + if (tagSynchronizerInitialized) { + if (StringUtils.isNotBlank(tagSourceName)) { + tagSynchronizer.getFirstTagSource().synchUp(); + } else { + tagSynchronizer.run(); } + } else { + LOG.error("TagSynchronizer failed to initialize correctly, exiting.."); + System.exit(-1); } - - tagSynchronizer.run(); } public TagSynchronizer() { @@ -80,64 +81,124 @@ public class TagSynchronizer { } } - public boolean initialize() { + public boolean initialize(String source) { if (LOG.isDebugEnabled()) { - LOG.debug("==> TagSynchronizer.initialize()"); + LOG.debug("==> TagSynchronizer.initialize(" + source + ")"); } printConfigurationProperties(); - boolean ret = true; + boolean ret = false; + + String tagSourceNames = StringUtils.isNotBlank(source) ? source : + TagSyncConfig.getTagSource(properties); + + if (StringUtils.isNotBlank(tagSourceNames)) { + + LOG.info("Initializing TAG source and sink"); - String tagSourceName = TagSyncConfig.getTagSource(properties); + TagSink tagSink = initializeTagSink(); - if (StringUtils.isBlank(tagSourceName) || - (!StringUtils.equalsIgnoreCase(tagSourceName, "file") && !StringUtils.equalsIgnoreCase(tagSourceName, "atlas"))) { - ret = false; - LOG.error("'ranger.tagsync.source.impl.class' value is invalid!, 'ranger.tagsync.source.impl.class'=" + tagSourceName + ". Supported 'ranger.tagsync.source.impl.class' values are : file, atlas"); + if (tagSink != null) { + + tagSources = initializeTagSources(tagSourceNames); + + if (CollectionUtils.isNotEmpty(tagSources)) { + for (TagSource tagSource : tagSources) { + tagSource.setTagSink(tagSink); + } + ret = true; + } + } + } else { + LOG.error("'ranger.tagsync.source.impl.class' value is not specified or is empty!"); } - if (ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagSynchronizer.initialize(" + tagSourceNames + ") : " + ret); + } + + return ret; + } + + public TagSink initializeTagSink() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagSynchronizer.initializeTagSink()"); + } - try { - LOG.info("Initializing TAG source and sink"); - // Initialize tagSink and tagSource - String tagSourceClassName = TagSyncConfig.getTagSourceClassName(properties); - String tagSinkClassName = TagSyncConfig.getTagSinkClassName(properties); + TagSink ret = null; + try { + String tagSinkClassName = TagSyncConfig.getTagSinkClassName(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("tagSinkClassName=" + tagSinkClassName); + } + @SuppressWarnings("unchecked") + Class<TagSink> tagSinkClass = (Class<TagSink>) Class.forName(tagSinkClassName); + + ret = tagSinkClass.newInstance(); + + if (!ret.initialize(properties)) { + LOG.error("Failed to initialize TAG sink " + tagSinkClassName); + + ret = null; + } + } catch (Throwable t) { + LOG.error("Failed to initialize TAG sink. Error details: ", t); + ret = null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagSynchronizer.initializeTagSink()"); + } + return ret; + } + + public List<TagSource> initializeTagSources(String tagSourceNames) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagSynchronizer.initializeTagSources(" + tagSourceNames + ")"); + } + + List<TagSource> ret = new ArrayList<TagSource>(); + + String[] tagSourcesArray = tagSourceNames.split(","); + + List<String> tagSourceList = Arrays.asList(tagSourcesArray); + + try { + for (String tagSourceName : tagSourceList) { + + String tagSourceClassName = TagSyncConfig.getTagSourceClassName(tagSourceName.trim()); if (LOG.isDebugEnabled()) { - LOG.debug("tagSourceClassName=" + tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName); + LOG.debug("tagSourceClassName=" + tagSourceClassName); } @SuppressWarnings("unchecked") Class<TagSource> tagSourceClass = (Class<TagSource>) Class.forName(tagSourceClassName); - - @SuppressWarnings("unchecked") - Class<TagSink> tagSinkClass = (Class<TagSink>) Class.forName(tagSinkClassName); - - TagSink tagSink = tagSinkClass.newInstance(); - tagSource = tagSourceClass.newInstance(); + TagSource tagSource = tagSourceClass.newInstance(); if (LOG.isDebugEnabled()) { - LOG.debug("Created instance of " + tagSourceClassName + ", " + tagSinkClassName); + LOG.debug("Created instance of " + tagSourceClassName); } - ret = tagSink.initialize(properties) && tagSource.initialize(properties); + if (!tagSource.initialize(properties)) { + LOG.error("Failed to initialize TAG source " + tagSourceClassName); - if (ret) { - tagSource.setTagSink(tagSink); + ret.clear(); + break; } - - LOG.info("Done initializing TAG source and sink"); - } catch (Throwable t) { - LOG.error("Failed to initialize TAG source/sink. Error details: ", t); - ret = false; + ret.add(tagSource); } + + } catch (Throwable t) { + LOG.error("Failed to initialize TAG sources. Error details: ", t); + ret.clear(); } if (LOG.isDebugEnabled()) { - LOG.debug("<== TagSynchronizer.initialize(), result=" + ret); + LOG.debug("<== TagSynchronizer.initializeTagSources(" + tagSourceNames + ")"); } return ret; @@ -150,35 +211,31 @@ public class TagSynchronizer { long shutdownCheckIntervalInMs = 60*1000; - Thread tagSourceThread = null; + boolean tagSourcesStarted = true; try { - tagSourceThread = tagSource.start(); + for (TagSource tagSource : tagSources) { + tagSourcesStarted = tagSourcesStarted && tagSource.start(); + } - if (tagSourceThread != null) { + if (tagSourcesStarted) { while (!shutdownFlag) { try { LOG.debug("Sleeping for [" + shutdownCheckIntervalInMs + "] milliSeconds"); Thread.sleep(shutdownCheckIntervalInMs); } catch (InterruptedException e) { - LOG.error("Failed to wait for [" + shutdownCheckIntervalInMs + "] milliseconds before attempting to synchronize tag information", e); + LOG.error("Failed to wait for [" + shutdownCheckIntervalInMs + "] milliseconds before attempting to synchronize tag information ", e); + break; } } } } catch (Throwable t) { - LOG.error("tag-sync thread got an error", t); + LOG.error("tag-sync main thread got an error", t); } finally { - if (tagSourceThread != null) { - LOG.error("Shutting down the tag-sync thread"); - LOG.info("Interrupting tagSourceThread..."); - tagSourceThread.interrupt(); - try { - tagSourceThread.join(); - } catch (InterruptedException interruptedException) { - LOG.error("tagSourceThread.join() was interrupted"); - } - } else { - LOG.error("Could not start tagSource monitoring thread"); + LOG.info("Stopping all tagSources"); + + for (TagSource tagSource : tagSources) { + tagSource.stop(); } } @@ -192,6 +249,10 @@ public class TagSynchronizer { this.shutdownFlag = true; } + public TagSource getFirstTagSource() { + return tagSources.get(0); + } + public void printConfigurationProperties() { LOG.info("--------------------------------"); LOG.info(""); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java new file mode 100644 index 0000000..1541034 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.sink.tagadmin; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.admin.client.datatype.RESTResponse; +import org.apache.ranger.tagsync.model.TagSink; +import org.apache.ranger.plugin.util.RangerRESTClient; +import org.apache.ranger.plugin.util.SearchFilter; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.Map; +import java.util.Properties; + +public class TagAdminRESTSink implements TagSink { + private static final Log LOG = LogFactory.getLog(TagAdminRESTSink.class); + + private static final String REST_PREFIX = "/service"; + private static final String MODULE_PREFIX = "/tags"; + + private static final String REST_MIME_TYPE_JSON = "application/json" ; + + private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/"; + + private RangerRESTClient tagRESTClient = null; + + @Override + public boolean initialize(Properties properties) { + if(LOG.isDebugEnabled()) { + LOG.debug("==> TagAdminRESTSink.initialize()"); + } + + boolean ret = true; + + String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties); + String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties); + String userName = TagSyncConfig.getTagAdminUserName(properties); + String password = TagSyncConfig.getTagAdminPassword(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("restUrl=" + restUrl); + LOG.debug("sslConfigFile=" + sslConfigFile); + LOG.debug("userName=" + userName); + } + + if (StringUtils.isBlank(restUrl)) { + ret = false; + LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!"); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("ranger.tagsync.tagadmin.rest.url:" + restUrl); + } + } + + if (ret) { + tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile); + tagRESTClient.setBasicAuthInfo(userName, password); + + ret = testConnection(); + } + + if (!ret) { + LOG.error("Cannot connect to Tag Admin. Please recheck configuration properties and/or check if Tag Admin is running and responsive"); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== TagAdminRESTSink.initialize(), result=" + ret); + } + + return ret; + } + + public boolean testConnection() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAdminRESTSink.testConnection()"); + } + + boolean ret = true; + + try { + // build a dummy serviceTags structure and upload it to test connectivity + ServiceTags serviceTags = new ServiceTags(); + serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE); + uploadServiceTags(serviceTags); + } catch (Exception exception) { + LOG.error("test-upload of serviceTags failed.", exception); + ret = false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagAdminRESTSink.testConnection(), result=" + ret); + } + return ret; + } + + @Override + synchronized public void uploadServiceTags(ServiceTags serviceTags) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> uploadServiceTags()"); + } + WebResource webResource = createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE); + + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags)); + + if(response == null || response.getStatus() != 204) { + LOG.error("RangerAdmin REST call returned with response={" + response + "}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + LOG.error("Upload of service-tags failed with message " + resp.getMessage()); + + throw new Exception("Upload of service-tags failed with response: " + response); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== uploadServiceTags()"); + } + } + + private WebResource createWebResource(String url) { + return createWebResource(url, null); + } + + private WebResource createWebResource(String url, SearchFilter filter) { + WebResource ret = tagRESTClient.getResource(url); + + if(filter != null && !MapUtils.isEmpty(filter.getParams())) { + for(Map.Entry<String, String> e : filter.getParams().entrySet()) { + String name = e.getKey(); + String value = e.getValue(); + + ret.queryParam(name, value); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java deleted file mode 100644 index 41085d0..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.sink.tagadmin; - -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; - -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.admin.client.datatype.RESTResponse; -import org.apache.ranger.tagsync.model.TagSink; -import org.apache.ranger.plugin.util.RangerRESTClient; -import org.apache.ranger.plugin.util.SearchFilter; -import org.apache.ranger.plugin.util.ServiceTags; -import org.apache.ranger.tagsync.process.TagSyncConfig; - -import java.util.Map; -import java.util.Properties; - -public class TagRESTSink implements TagSink { - private static final Log LOG = LogFactory.getLog(TagRESTSink.class); - - private static final String REST_PREFIX = "/service"; - private static final String MODULE_PREFIX = "/tags"; - - private static final String REST_MIME_TYPE_JSON = "application/json" ; - - private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/"; - - private RangerRESTClient tagRESTClient = null; - - @Override - public boolean initialize(Properties properties) { - if(LOG.isDebugEnabled()) { - LOG.debug("==> TagRESTSink.initialize()"); - } - - boolean ret = true; - - String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties); - String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties); - String userName = TagSyncConfig.getTagAdminUserName(properties); - String password = TagSyncConfig.getTagAdminPassword(properties); - - if (LOG.isDebugEnabled()) { - LOG.debug("restUrl=" + restUrl); - LOG.debug("sslConfigFile=" + sslConfigFile); - LOG.debug("userName=" + userName); - } - - if (StringUtils.isBlank(restUrl)) { - ret = false; - LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("ranger.tagsync.tagadmin.rest.url:" + restUrl); - } - } - - if (ret) { - tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile); - tagRESTClient.setBasicAuthInfo(userName, password); - - ret = testConnection(); - } - - if (!ret) { - LOG.error("Cannot connect to Tag Admin. Please recheck configuration properties and/or check if Tag Admin is running and responsive"); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("<== TagRESTSink.initialize(), result=" + ret); - } - - return ret; - } - - public boolean testConnection() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagRESTSink.testConnection()"); - } - - boolean ret = true; - - try { - // build a dummy serviceTags structure and upload it to test connectivity - ServiceTags serviceTags = new ServiceTags(); - serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE); - uploadServiceTags(serviceTags); - } catch (Exception exception) { - LOG.error("test-upload of serviceTags failed.", exception); - ret = false; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagRESTSink.testConnection(), result=" + ret); - } - return ret; - } - - @Override - public void uploadServiceTags(ServiceTags serviceTags) throws Exception { - if(LOG.isDebugEnabled()) { - LOG.debug("==> uploadServiceTags()"); - } - WebResource webResource = createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE); - - ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags)); - - if(response == null || response.getStatus() != 204) { - LOG.error("RangerAdmin REST call returned with response={" + response + "}"); - - RESTResponse resp = RESTResponse.fromClientResponse(response); - - LOG.error("Upload of service-tags failed with message " + resp.getMessage()); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("<== uploadServiceTags()"); - } - } - - private WebResource createWebResource(String url) { - return createWebResource(url, null); - } - - private WebResource createWebResource(String url, SearchFilter filter) { - WebResource ret = tagRESTClient.getResource(url); - - if(filter != null && !MapUtils.isEmpty(filter.getParams())) { - for(Map.Entry<String, String> e : filter.getParams().entrySet()) { - String name = e.getKey(); - String value = e.getValue(); - - ret.queryParam(name, value); - } - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java new file mode 100644 index 0000000..a17d611 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.plugin.model.RangerPolicy; +import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; + +public class AtlasHiveResourceMapper extends AtlasResourceMapper { + private static final Log LOG = LogFactory.getLog(AtlasHiveResourceMapper.class); + + public static final String COMPONENT_NAME = "hive"; + + public static final String ENTITY_TYPE_HIVE_DB = "hive_db"; + public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table"; + public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column"; + + public static final String RANGER_TYPE_HIVE_DB = "database"; + public static final String RANGER_TYPE_HIVE_TABLE = "table"; + public static final String RANGER_TYPE_HIVE_COLUMN = "column"; + + public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name"; + + static protected final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + + private static String clusterDelimiter = "@"; + + private static String qualifiedNameDelimiter = "\\."; + + public static final String[] supportedEntityTypes = { ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN }; + + public AtlasHiveResourceMapper() { + super(); + } + + @Override + public List<String> getSupportedEntityTypes() { + return Arrays.asList(supportedEntityTypes); + } + + @Override + public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { + + Map<String, RangerPolicy.RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>(); + + String serviceName = null; + + List<String> components = getQualifiedNameComponents(entity); + // components should contain qualifiedName, clusterName, dbName, tableName, columnName in that order + + String entityTypeName = entity.getTypeName(); + + String qualifiedName = components.get(0); + + String clusterName, dbName, tableName, columnName; + + if (components.size() > 1) { + clusterName = components.get(1); + serviceName = getRangerServiceName(clusterName); + } + + if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) { + if (components.size() > 2) { + dbName = components.get(2); + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + + } else { + LOG.error("invalid qualifiedName for HIVE_DB, qualifiedName=" + qualifiedName); + } + } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE)) { + if (components.size() > 3) { + dbName = components.get(2); + tableName = components.get(3); + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); + elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); + } else { + LOG.error("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + qualifiedName); + } + } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { + if (components.size() > 4) { + dbName = components.get(2); + tableName = components.get(3); + columnName = components.get(4); + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); + elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); + RangerPolicy.RangerPolicyResource columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName); + elements.put(RANGER_TYPE_HIVE_COLUMN, columnPolicyResource); + } else { + LOG.error("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + qualifiedName); + } + + } + + RangerServiceResource ret = new RangerServiceResource(); + + ret.setGuid(entity.getId()._getId()); + ret.setServiceName(serviceName); + ret.setResourceElements(elements); + + return ret; + } + + public String getRangerServiceName(String clusterName) { + String ret = getRangerServiceName(COMPONENT_NAME, clusterName); + + if (StringUtils.isBlank(ret)) { + ret = clusterName + TagSyncConfig.TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + COMPONENT_NAME; + } + return ret; + } + + public final List<String> getQualifiedNameComponents(IReferenceableInstance entity) throws Exception { + + String qualifiedNameAttributeName = getQualifiedNameAttributeName(entity.getTypeName()); + + String qualifiedName = getEntityAttribute(entity, qualifiedNameAttributeName, String.class); + + List<String> ret = getQualifiedNameComponents(entity.getTypeName(), qualifiedName); + + if (LOG.isDebugEnabled()) { + LOG.debug("----- Entity-Id:" + entity.getId()._getId()); + LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); + LOG.debug("----- Entity-Components -----"); + int i = 0; + for (String value : ret) { + LOG.debug("----- Index:" + i++ + " Value:" + value); + } + } + return ret; + } + + public final List<String> getQualifiedNameComponents(String entityTypeName, String qualifiedName) throws Exception { + + String qualifiedNameAttributeName = getQualifiedNameAttributeName(entityTypeName); + + if (StringUtils.isBlank(qualifiedName)) { + throw new Exception("Could not get a valid value for " + qualifiedNameAttributeName + " attribute from entity."); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Received .... " + qualifiedNameAttributeName + "=" + qualifiedName + " for entity type " + entityTypeName); + } + + String components[] = qualifiedName.split(clusterDelimiter); + + if (components.length != 2) { + throw new Exception("Qualified Name does not contain cluster-name, qualifiedName=" + qualifiedName); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("name-hierarchy=" + components[0] + ", cluster-name=" + components[1]); + } + + String nameHierarchy[] = components[0].split(qualifiedNameDelimiter); + + List<String> ret = new ArrayList<String>(); + + ret.add(qualifiedName); + ret.add(components[1]); + + ret.addAll(Arrays.asList(nameHierarchy)); + + return ret; + } + + public String getQualifiedNameAttributeName(String entityTypeName) { + return StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) ? + ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE : ENTITY_ATTRIBUTE_QUALIFIED_NAME; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java new file mode 100644 index 0000000..fd94928 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.Properties; +import java.util.List; +import java.util.Map; + +public abstract class AtlasResourceMapper { + private static final Log LOG = LogFactory.getLog(AtlasResourceMapper.class); + + protected Properties properties; + + public AtlasResourceMapper() { + } + + public void initialize(Properties properties) { + this.properties = properties; + } + + abstract public List<String> getSupportedEntityTypes(); + + abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception; + + + protected String getRangerServiceName(String componentName, String atlasInstanceName) { + String propName = TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName + + ".instance." + atlasInstanceName + + TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX; + + return properties.getProperty(propName); + } + + static protected <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) { + T ret = null; + + try { + Map<String, Object> valueMap = entity.getValuesMap(); + ret = getAttribute(valueMap, name, type); + } catch (AtlasException exception) { + LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception); + } + + return ret; + } + + static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { + return type.cast(map.get(name)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java new file mode 100644 index 0000000..f05d814 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for th + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.ranger.plugin.model.RangerServiceResource; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +public class AtlasResourceMapperUtil { + private static final Log LOG = LogFactory.getLog(AtlasResourceMapperUtil.class); + + private static Map<String, AtlasResourceMapper> atlasResourceMappers = new HashMap<String, AtlasResourceMapper>(); + + public static boolean isEntityTypeHandled(String entityTypeName) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> isEntityTypeHandled(entityTypeName=" + entityTypeName + ")"); + } + + AtlasResourceMapper mapper = atlasResourceMappers.get(entityTypeName); + + boolean ret = mapper != null; + + if (LOG.isDebugEnabled()) { + LOG.debug("<== isEntityTypeHandled(entityTypeName=" + entityTypeName + ") : " + ret); + } + + return ret; + } + + public static RangerServiceResource getRangerServiceResource(IReferenceableInstance atlasEntity) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getRangerServiceResource(" + atlasEntity.getId()._getId() +")"); + } + + RangerServiceResource resource = null; + + AtlasResourceMapper mapper = atlasResourceMappers.get(atlasEntity.getTypeName()); + + if (mapper != null) { + try { + resource = mapper.buildResource(atlasEntity); + } catch (Exception exception) { + LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getId()._getId() + ": ", exception); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getRangerServiceResource(" + atlasEntity.getId()._getId() +"): resource=" + resource); + } + + return resource; + } + + static public boolean initializeAtlasResourceMappers(Properties properties) { + final String MAPPER_NAME_DELIMIER = ","; + + String customMapperNames = TagSyncConfig.getCustomAtlasResourceMappers(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + ")"); + } + boolean ret = true; + + String allResourceMappers = "org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper"; + + if (StringUtils.isNotBlank(customMapperNames)) { + allResourceMappers = allResourceMappers + MAPPER_NAME_DELIMIER + customMapperNames; + } + + String[] mapperNamesArray = allResourceMappers.split(MAPPER_NAME_DELIMIER); + + List<String> mapperNames = Arrays.asList(mapperNamesArray); + + for (String mapperName : mapperNames) { + mapperName = mapperName.trim(); + try { + Class clazz = Class.forName(mapperName); + AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance(); + + resourceMapper.initialize(properties); + + for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) { + add(entityTypeName, resourceMapper); + } + + } catch (Exception exception) { + LOG.error("Failed to create AtlasResourceMapper:" + mapperName + ": ", exception); + ret = false; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + allResourceMappers + "): " + ret); + } + return ret; + } + + private static void add(String entityType, AtlasResourceMapper mapper) { + atlasResourceMappers.put(entityType, mapper); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java new file mode 100644 index 0000000..7694b37 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provider; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.entity.EntityNotification; + +import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.ranger.plugin.util.ServiceTags; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class AtlasTagSource extends AbstractTagSource { + private static final Log LOG = LogFactory.getLog(AtlasTagSource.class); + + public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties"; + + public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; + public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; + public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; + + private ConsumerRunnable consumerTask; + + @Override + public boolean initialize(Properties properties) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasTagSource.initialize()"); + } + + Properties atlasProperties = new Properties(); + + boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties); + + if (ret) { + + InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME); + + if (inputStream != null) { + try { + atlasProperties.load(inputStream); + } catch (Exception exception) { + ret = false; + LOG.error("Cannot load Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, exception); + } finally { + try { + inputStream.close(); + } catch (IOException ioException) { + LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException); + } + } + } else { + ret = false; + LOG.error("Cannot find Atlas application properties file"); + } + } + + if (ret) { + if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) { + ret = false; + LOG.error("Value of property '" + TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!"); + } + if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) { + ret = false; + LOG.error("Value of property '" + TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!"); + } + if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) { + ret = false; + LOG.error("Value of property '" + TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!"); + } + } + + if (ret) { + NotificationModule notificationModule = new NotificationModule(); + + Injector injector = Guice.createInjector(notificationModule); + + Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class); + NotificationInterface notification = consumerProvider.get(); + List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + + consumerTask = new ConsumerRunnable(iterators.get(0)); + + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasTagSource.initialize(), result=" + ret); + } + return ret; + } + + @Override + public boolean start() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasTagSource.start()"); + } + Thread consumerThread = null; + if (consumerTask == null) { + LOG.error("No consumerTask!!!"); + } else { + consumerThread = new Thread(consumerTask); + consumerThread.setDaemon(true); + consumerThread.start(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasTagSource.start()"); + } + return consumerThread != null; + } + + @Override + public boolean isChanged() { + return true; + } + + private static void printEntityNotification(EntityNotification notification) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notification-Type: " + notification.getOperationType()); + AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits()); + LOG.debug(entityWithTraits); + } + } + + private class ConsumerRunnable implements Runnable { + + private final NotificationConsumer<EntityNotification> consumer; + + private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) { + this.consumer = consumer; + } + + private boolean hasNext() { + boolean ret = false; + try { + ret = consumer.hasNext(); + } catch (Exception exception) { + LOG.error("EntityNotification consumer threw exception, IGNORING...:", exception); + } + return ret; + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> ConsumerRunnable.run()"); + } + while (!shutdown) { + if (hasNext()) { + EntityNotification notification = consumer.next(); + if (notification != null) { + printEntityNotification(notification); + + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); + if (serviceTags == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType()); + } + } else { + updateSink(serviceTags); + } + } + } + } + LOG.info("Shutting down the Tag-Atlas-source thread"); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java new file mode 100644 index 0000000..e6a4b36 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.IStruct; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class AtlasEntityWithTraits { + + private final IReferenceableInstance entity; + private final List<IStruct> traits; + + public AtlasEntityWithTraits(IReferenceableInstance entity, List<IStruct> traits) { + this.entity = entity; + this.traits = traits; + } + + public IReferenceableInstance getEntity() { + return entity; + } + + public List<IStruct> getAllTraits() { + return traits == null ? new LinkedList<IStruct>() : traits; + } + + @Override + public String toString( ) { + StringBuilder sb = new StringBuilder(); + + toString(sb); + + return sb.toString(); + } + + public void toString(StringBuilder sb) { + + sb.append("AtlasEntityWithTraits={ "); + + sb.append("Entity-Id: " + entity.getId()._getId()).append(", ") + .append("Entity-Type: " + entity.getTypeName()).append(", "); + + sb.append("Entity-Values={ "); + try { + for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) { + sb.append("{").append(entry.getKey()).append(", ").append(entry.getValue()).append("}, "); + } + } catch (AtlasException exception) { + // Ignore + } + sb.append(" }"); + + sb.append(", Entity-Traits={ "); + for (IStruct trait : traits) { + try { + sb.append("{traitType=").append(trait.getTypeName()).append(", "); + Map<String, Object> traitValues = trait.getValuesMap(); + sb.append("{"); + for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) { + sb.append("{").append(valueEntry.getKey()).append(", ").append(valueEntry.getValue()).append("}"); + } + sb.append("}"); + + sb.append(" }"); + } catch (AtlasException exception) { + // Ignore + } + } + sb.append(" }"); + + sb.append(" }"); + + } + +}
