Repository: incubator-ranger Updated Branches: refs/heads/master ffb895e2c -> 9ea1d4ad7
RANGER-807: tagsync fixes to comply with good coding practices Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/9ea1d4ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/9ea1d4ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/9ea1d4ad Branch: refs/heads/master Commit: 9ea1d4ad74b1ac7ffc0530aefadf87ffb4f33ec2 Parents: ffb895e Author: Madhan Neethiraj <[email protected]> Authored: Tue Jan 12 16:30:35 2016 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Jan 12 16:58:36 2016 -0800 ---------------------------------------------------------------------- .../source/Atlas/AtlasHiveResourceMapper.java | 203 ------------------ .../source/Atlas/AtlasResourceMapper.java | 74 ------- .../source/Atlas/AtlasResourceMapperUtil.java | 124 ----------- .../tagsync/source/Atlas/AtlasTagSource.java | 197 ------------------ .../source/atlas/AtlasHiveResourceMapper.java | 206 +++++++++++++++++++ .../source/atlas/AtlasNotificationMapper.java | 190 ++++++++--------- .../source/atlas/AtlasResourceMapper.java | 74 +++++++ .../source/atlas/AtlasResourceMapperUtil.java | 124 +++++++++++ .../tagsync/source/atlas/AtlasTagSource.java | 199 ++++++++++++++++++ 9 files changed, 700 insertions(+), 691 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java deleted file mode 100644 index a17d611..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.plugin.model.RangerPolicy; -import org.apache.ranger.plugin.model.RangerServiceResource; -import org.apache.ranger.tagsync.process.TagSyncConfig; - -import java.util.Map; -import java.util.HashMap; -import java.util.List; -import java.util.ArrayList; -import java.util.Arrays; - -public class AtlasHiveResourceMapper extends AtlasResourceMapper { - private static final Log LOG = LogFactory.getLog(AtlasHiveResourceMapper.class); - - public static final String COMPONENT_NAME = "hive"; - - public static final String ENTITY_TYPE_HIVE_DB = "hive_db"; - public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table"; - public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column"; - - public static final String RANGER_TYPE_HIVE_DB = "database"; - public static final String RANGER_TYPE_HIVE_TABLE = "table"; - public static final String RANGER_TYPE_HIVE_COLUMN = "column"; - - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name"; - - static protected final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - - private static String clusterDelimiter = "@"; - - private static String qualifiedNameDelimiter = "\\."; - - public static final String[] supportedEntityTypes = { ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN }; - - public AtlasHiveResourceMapper() { - super(); - } - - @Override - public List<String> getSupportedEntityTypes() { - return Arrays.asList(supportedEntityTypes); - } - - @Override - public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { - - Map<String, RangerPolicy.RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>(); - - String serviceName = null; - - List<String> components = getQualifiedNameComponents(entity); - // components should contain qualifiedName, clusterName, dbName, tableName, columnName in that order - - String entityTypeName = entity.getTypeName(); - - String qualifiedName = components.get(0); - - String clusterName, dbName, tableName, columnName; - - if (components.size() > 1) { - clusterName = components.get(1); - serviceName = getRangerServiceName(clusterName); - } - - if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) { - if (components.size() > 2) { - dbName = components.get(2); - RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); - elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); - - } else { - LOG.error("invalid qualifiedName for HIVE_DB, qualifiedName=" + qualifiedName); - } - } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE)) { - if (components.size() > 3) { - dbName = components.get(2); - tableName = components.get(3); - RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); - elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); - RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); - elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); - } else { - LOG.error("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + qualifiedName); - } - } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { - if (components.size() > 4) { - dbName = components.get(2); - tableName = components.get(3); - columnName = components.get(4); - RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); - elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); - RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); - elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); - RangerPolicy.RangerPolicyResource columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName); - elements.put(RANGER_TYPE_HIVE_COLUMN, columnPolicyResource); - } else { - LOG.error("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + qualifiedName); - } - - } - - RangerServiceResource ret = new RangerServiceResource(); - - ret.setGuid(entity.getId()._getId()); - ret.setServiceName(serviceName); - ret.setResourceElements(elements); - - return ret; - } - - public String getRangerServiceName(String clusterName) { - String ret = getRangerServiceName(COMPONENT_NAME, clusterName); - - if (StringUtils.isBlank(ret)) { - ret = clusterName + TagSyncConfig.TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + COMPONENT_NAME; - } - return ret; - } - - public final List<String> getQualifiedNameComponents(IReferenceableInstance entity) throws Exception { - - String qualifiedNameAttributeName = getQualifiedNameAttributeName(entity.getTypeName()); - - String qualifiedName = getEntityAttribute(entity, qualifiedNameAttributeName, String.class); - - List<String> ret = getQualifiedNameComponents(entity.getTypeName(), qualifiedName); - - if (LOG.isDebugEnabled()) { - LOG.debug("----- Entity-Id:" + entity.getId()._getId()); - LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); - LOG.debug("----- Entity-Components -----"); - int i = 0; - for (String value : ret) { - LOG.debug("----- Index:" + i++ + " Value:" + value); - } - } - return ret; - } - - public final List<String> getQualifiedNameComponents(String entityTypeName, String qualifiedName) throws Exception { - - String qualifiedNameAttributeName = getQualifiedNameAttributeName(entityTypeName); - - if (StringUtils.isBlank(qualifiedName)) { - throw new Exception("Could not get a valid value for " + qualifiedNameAttributeName + " attribute from entity."); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Received .... " + qualifiedNameAttributeName + "=" + qualifiedName + " for entity type " + entityTypeName); - } - - String components[] = qualifiedName.split(clusterDelimiter); - - if (components.length != 2) { - throw new Exception("Qualified Name does not contain cluster-name, qualifiedName=" + qualifiedName); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("name-hierarchy=" + components[0] + ", cluster-name=" + components[1]); - } - - String nameHierarchy[] = components[0].split(qualifiedNameDelimiter); - - List<String> ret = new ArrayList<String>(); - - ret.add(qualifiedName); - ret.add(components[1]); - - ret.addAll(Arrays.asList(nameHierarchy)); - - return ret; - } - - public String getQualifiedNameAttributeName(String entityTypeName) { - return StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) ? - ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE : ENTITY_ATTRIBUTE_QUALIFIED_NAME; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java deleted file mode 100644 index fd94928..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.plugin.model.RangerServiceResource; -import org.apache.ranger.tagsync.process.TagSyncConfig; - -import java.util.Properties; -import java.util.List; -import java.util.Map; - -public abstract class AtlasResourceMapper { - private static final Log LOG = LogFactory.getLog(AtlasResourceMapper.class); - - protected Properties properties; - - public AtlasResourceMapper() { - } - - public void initialize(Properties properties) { - this.properties = properties; - } - - abstract public List<String> getSupportedEntityTypes(); - - abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception; - - - protected String getRangerServiceName(String componentName, String atlasInstanceName) { - String propName = TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName - + ".instance." + atlasInstanceName - + TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX; - - return properties.getProperty(propName); - } - - static protected <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) { - T ret = null; - - try { - Map<String, Object> valueMap = entity.getValuesMap(); - ret = getAttribute(valueMap, name, type); - } catch (AtlasException exception) { - LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception); - } - - return ret; - } - - static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { - return type.cast(map.get(name)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java deleted file mode 100644 index f05d814..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for th - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.commons.lang.StringUtils; -import org.apache.ranger.plugin.model.RangerServiceResource; - -import java.util.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.tagsync.process.TagSyncConfig; - -public class AtlasResourceMapperUtil { - private static final Log LOG = LogFactory.getLog(AtlasResourceMapperUtil.class); - - private static Map<String, AtlasResourceMapper> atlasResourceMappers = new HashMap<String, AtlasResourceMapper>(); - - public static boolean isEntityTypeHandled(String entityTypeName) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> isEntityTypeHandled(entityTypeName=" + entityTypeName + ")"); - } - - AtlasResourceMapper mapper = atlasResourceMappers.get(entityTypeName); - - boolean ret = mapper != null; - - if (LOG.isDebugEnabled()) { - LOG.debug("<== isEntityTypeHandled(entityTypeName=" + entityTypeName + ") : " + ret); - } - - return ret; - } - - public static RangerServiceResource getRangerServiceResource(IReferenceableInstance atlasEntity) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getRangerServiceResource(" + atlasEntity.getId()._getId() +")"); - } - - RangerServiceResource resource = null; - - AtlasResourceMapper mapper = atlasResourceMappers.get(atlasEntity.getTypeName()); - - if (mapper != null) { - try { - resource = mapper.buildResource(atlasEntity); - } catch (Exception exception) { - LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getId()._getId() + ": ", exception); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getRangerServiceResource(" + atlasEntity.getId()._getId() +"): resource=" + resource); - } - - return resource; - } - - static public boolean initializeAtlasResourceMappers(Properties properties) { - final String MAPPER_NAME_DELIMIER = ","; - - String customMapperNames = TagSyncConfig.getCustomAtlasResourceMappers(properties); - - if (LOG.isDebugEnabled()) { - LOG.debug("==> initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + ")"); - } - boolean ret = true; - - String allResourceMappers = "org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper"; - - if (StringUtils.isNotBlank(customMapperNames)) { - allResourceMappers = allResourceMappers + MAPPER_NAME_DELIMIER + customMapperNames; - } - - String[] mapperNamesArray = allResourceMappers.split(MAPPER_NAME_DELIMIER); - - List<String> mapperNames = Arrays.asList(mapperNamesArray); - - for (String mapperName : mapperNames) { - mapperName = mapperName.trim(); - try { - Class clazz = Class.forName(mapperName); - AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance(); - - resourceMapper.initialize(properties); - - for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) { - add(entityTypeName, resourceMapper); - } - - } catch (Exception exception) { - LOG.error("Failed to create AtlasResourceMapper:" + mapperName + ": ", exception); - ret = false; - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + allResourceMappers + "): " + ret); - } - return ret; - } - - private static void add(String entityType, AtlasResourceMapper mapper) { - atlasResourceMappers.put(entityType, mapper); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java deleted file mode 100644 index 7694b37..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - - -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Provider; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.atlas.notification.NotificationConsumer; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.NotificationModule; -import org.apache.atlas.notification.entity.EntityNotification; - -import org.apache.ranger.tagsync.model.AbstractTagSource; -import org.apache.ranger.plugin.util.ServiceTags; - -import java.io.IOException; -import java.io.InputStream; -import java.util.*; - -public class AtlasTagSource extends AbstractTagSource { - private static final Log LOG = LogFactory.getLog(AtlasTagSource.class); - - public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties"; - - public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; - public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; - public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; - - private ConsumerRunnable consumerTask; - - @Override - public boolean initialize(Properties properties) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasTagSource.initialize()"); - } - - Properties atlasProperties = new Properties(); - - boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties); - - if (ret) { - - InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME); - - if (inputStream != null) { - try { - atlasProperties.load(inputStream); - } catch (Exception exception) { - ret = false; - LOG.error("Cannot load Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, exception); - } finally { - try { - inputStream.close(); - } catch (IOException ioException) { - LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException); - } - } - } else { - ret = false; - LOG.error("Cannot find Atlas application properties file"); - } - } - - if (ret) { - if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) { - ret = false; - LOG.error("Value of property '" + TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!"); - } - if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) { - ret = false; - LOG.error("Value of property '" + TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!"); - } - if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) { - ret = false; - LOG.error("Value of property '" + TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!"); - } - } - - if (ret) { - NotificationModule notificationModule = new NotificationModule(); - - Injector injector = Guice.createInjector(notificationModule); - - Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class); - NotificationInterface notification = consumerProvider.get(); - List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - - consumerTask = new ConsumerRunnable(iterators.get(0)); - - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasTagSource.initialize(), result=" + ret); - } - return ret; - } - - @Override - public boolean start() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasTagSource.start()"); - } - Thread consumerThread = null; - if (consumerTask == null) { - LOG.error("No consumerTask!!!"); - } else { - consumerThread = new Thread(consumerTask); - consumerThread.setDaemon(true); - consumerThread.start(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasTagSource.start()"); - } - return consumerThread != null; - } - - @Override - public boolean isChanged() { - return true; - } - - private static void printEntityNotification(EntityNotification notification) { - if (LOG.isDebugEnabled()) { - LOG.debug("Notification-Type: " + notification.getOperationType()); - AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits()); - LOG.debug(entityWithTraits); - } - } - - private class ConsumerRunnable implements Runnable { - - private final NotificationConsumer<EntityNotification> consumer; - - private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) { - this.consumer = consumer; - } - - private boolean hasNext() { - boolean ret = false; - try { - ret = consumer.hasNext(); - } catch (Exception exception) { - LOG.error("EntityNotification consumer threw exception, IGNORING...:", exception); - } - return ret; - } - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> ConsumerRunnable.run()"); - } - while (!shutdown) { - if (hasNext()) { - EntityNotification notification = consumer.next(); - if (notification != null) { - printEntityNotification(notification); - - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); - if (serviceTags == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType()); - } - } else { - updateSink(serviceTags); - } - } - } - } - LOG.info("Shutting down the Tag-Atlas-source thread"); - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java new file mode 100644 index 0000000..9a6fc13 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.plugin.model.RangerPolicy; +import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; + +public class AtlasHiveResourceMapper extends AtlasResourceMapper { + private static final Log LOG = LogFactory.getLog(AtlasHiveResourceMapper.class); + + public static final String COMPONENT_NAME = "hive"; + + public static final String ENTITY_TYPE_HIVE_DB = "hive_db"; + public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table"; + public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column"; + + public static final String RANGER_TYPE_HIVE_DB = "database"; + public static final String RANGER_TYPE_HIVE_TABLE = "table"; + public static final String RANGER_TYPE_HIVE_COLUMN = "column"; + + public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name"; + + static protected final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + + private static String clusterDelimiter = "@"; + + private static String qualifiedNameDelimiter = "\\."; + + public static final String[] supportedEntityTypes = { ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN }; + + public AtlasHiveResourceMapper() { + super(); + } + + @Override + public List<String> getSupportedEntityTypes() { + return Arrays.asList(supportedEntityTypes); + } + + @Override + public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { + + Map<String, RangerPolicy.RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>(); + + String serviceName = null; + + List<String> components = getQualifiedNameComponents(entity); + // components should contain qualifiedName, clusterName, dbName, tableName, columnName in that order + + String entityTypeName = entity.getTypeName(); + + String qualifiedName = components.get(0); + + String clusterName, dbName, tableName, columnName; + + if (components.size() > 1) { + clusterName = components.get(1); + serviceName = getRangerServiceName(clusterName); + } + + if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) { + if (components.size() > 2) { + dbName = components.get(2); + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + + } else { + LOG.error("invalid qualifiedName for HIVE_DB, qualifiedName=" + qualifiedName); + throw new Exception("invalid qualifiedName for HIVE_DB, qualifiedName=" + qualifiedName); + } + } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE)) { + if (components.size() > 3) { + dbName = components.get(2); + tableName = components.get(3); + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); + elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); + } else { + LOG.error("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + qualifiedName); + throw new Exception("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + qualifiedName); + } + } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { + if (components.size() > 4) { + dbName = components.get(2); + tableName = components.get(3); + columnName = components.get(4); + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); + elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); + RangerPolicy.RangerPolicyResource columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName); + elements.put(RANGER_TYPE_HIVE_COLUMN, columnPolicyResource); + } else { + LOG.error("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + qualifiedName); + throw new Exception("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + qualifiedName); + } + + } + + RangerServiceResource ret = new RangerServiceResource(); + + ret.setGuid(entity.getId()._getId()); + ret.setServiceName(serviceName); + ret.setResourceElements(elements); + + return ret; + } + + public String getRangerServiceName(String clusterName) { + String ret = getRangerServiceName(COMPONENT_NAME, clusterName); + + if (StringUtils.isBlank(ret)) { + ret = clusterName + TagSyncConfig.TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + COMPONENT_NAME; + } + return ret; + } + + public final List<String> getQualifiedNameComponents(IReferenceableInstance entity) throws Exception { + + String qualifiedNameAttributeName = getQualifiedNameAttributeName(entity.getTypeName()); + + String qualifiedName = getEntityAttribute(entity, qualifiedNameAttributeName, String.class); + + List<String> ret = getQualifiedNameComponents(entity.getTypeName(), qualifiedName); + + if (LOG.isDebugEnabled()) { + LOG.debug("----- Entity-Id:" + entity.getId()._getId()); + LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); + LOG.debug("----- Entity-Components -----"); + int i = 0; + for (String value : ret) { + LOG.debug("----- Index:" + i++ + " Value:" + value); + } + } + return ret; + } + + public final List<String> getQualifiedNameComponents(String entityTypeName, String qualifiedName) throws Exception { + + String qualifiedNameAttributeName = getQualifiedNameAttributeName(entityTypeName); + + if (StringUtils.isBlank(qualifiedName)) { + throw new Exception("Could not get a valid value for " + qualifiedNameAttributeName + " attribute from entity."); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Received .... " + qualifiedNameAttributeName + "=" + qualifiedName + " for entity type " + entityTypeName); + } + + String components[] = qualifiedName.split(clusterDelimiter); + + if (components.length != 2) { + throw new Exception("Qualified Name does not contain cluster-name, qualifiedName=" + qualifiedName); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("name-hierarchy=" + components[0] + ", cluster-name=" + components[1]); + } + + String nameHierarchy[] = components[0].split(qualifiedNameDelimiter); + + List<String> ret = new ArrayList<String>(); + + ret.add(qualifiedName); + ret.add(components[1]); + + ret.addAll(Arrays.asList(nameHierarchy)); + + return ret; + } + + public String getQualifiedNameAttributeName(String entityTypeName) { + return StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) ? + ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE : ENTITY_ATTRIBUTE_QUALIFIED_NAME; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 803a8a9..2168983 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -27,9 +27,11 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; import org.apache.ranger.plugin.model.RangerServiceResource; import org.apache.ranger.plugin.model.RangerTag; import org.apache.ranger.plugin.model.RangerTagDef; +import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef; import org.apache.ranger.plugin.util.ServiceTags; import java.util.*; @@ -47,7 +49,7 @@ public class AtlasNotificationMapper { if (AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) { AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entityNotification.getEntity(), entityNotification.getAllTraits()); - ret = buildServiceTags(entityWithTraits, 1L, 1L, null); + ret = buildServiceTags(entityWithTraits, null); } else { if (LOG.isDebugEnabled()) { LOG.debug("Ranger not interested in Entity Notification for entity-type " + entityNotification.getEntity().getTypeName()); @@ -99,83 +101,90 @@ public class AtlasNotificationMapper { Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>(); - long serviceResourceIndex = 1L; - long tagIndex = 1L; - for (AtlasEntityWithTraits element : entitiesWithTraits) { - - ServiceTags serviceTags = buildServiceTags(element, serviceResourceIndex, tagIndex, ret); - - serviceResourceIndex++; - - tagIndex += CollectionUtils.size(serviceTags.getTags()); - + buildServiceTags(element, ret); } // Remove duplicate tag definitions - for (Map.Entry<String, ServiceTags> serviceTagsMapEntry : ret.entrySet()){ - - Map<Long, RangerTagDef> allTagDefs = serviceTagsMapEntry.getValue().getTagDefinitions(); - - Map<String, String> tagTypeIndex = new HashMap<String, String>(); - Map<Long, RangerTagDef> uniqueTagDefs = new HashMap<Long, RangerTagDef>(); - - for (Map.Entry<Long, RangerTagDef> entry : allTagDefs.entrySet()) { - String tagTypeName = entry.getValue().getName(); + if(CollectionUtils.isNotEmpty(ret.values())) { + for (ServiceTags serviceTag : ret.values()) { + if(MapUtils.isNotEmpty(serviceTag.getTagDefinitions())) { + Map<String, RangerTagDef> uniqueTagDefs = new HashMap<String, RangerTagDef>(); + + for (RangerTagDef tagDef : serviceTag.getTagDefinitions().values()) { + RangerTagDef existingTagDef = uniqueTagDefs.get(tagDef.getName()); + + if (existingTagDef == null) { + uniqueTagDefs.put(tagDef.getName(), tagDef); + } else { + if(CollectionUtils.isNotEmpty(tagDef.getAttributeDefs())) { + for(RangerTagAttributeDef tagAttrDef : tagDef.getAttributeDefs()) { + boolean attrDefExists = false; + + if(CollectionUtils.isNotEmpty(existingTagDef.getAttributeDefs())) { + for(RangerTagAttributeDef existingTagAttrDef : existingTagDef.getAttributeDefs()) { + if(StringUtils.equalsIgnoreCase(existingTagAttrDef.getName(), tagAttrDef.getName())) { + attrDefExists = true; + break; + } + } + } + + if(! attrDefExists) { + existingTagDef.getAttributeDefs().add(tagAttrDef); + } + } + } + } + } - if (tagTypeIndex.get(tagTypeName) == null) { - tagTypeIndex.put(tagTypeName, tagTypeName); - uniqueTagDefs.put(entry.getKey(), entry.getValue()); + serviceTag.getTagDefinitions().clear(); + for(RangerTagDef tagDef : uniqueTagDefs.values()) { + serviceTag.getTagDefinitions().put(tagDef.getId(), tagDef); + } } } - serviceTagsMapEntry.getValue().setTagDefinitions(uniqueTagDefs); } return ret; } - static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, long index, long tagIndex, Map<String, ServiceTags> serviceTagsMap) throws Exception { - - ServiceTags ret = null; - - IReferenceableInstance entity = entityWithTraits.getEntity(); - - RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); + static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception { + ServiceTags ret = null; + IReferenceableInstance entity = entityWithTraits.getEntity(); + RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); if (serviceResource != null) { - - serviceResource.setId(index); + List<RangerTag> tags = getTags(entityWithTraits); + List<RangerTagDef> tagDefs = getTagDefs(entityWithTraits); String serviceName = serviceResource.getServiceName(); - Map<Long, RangerTag> tags = getTags(entityWithTraits, tagIndex); - - Map<Long, RangerTagDef> tagDefs = getTagDefs(tags); + ret = createOrGetServiceTags(serviceTagsMap, serviceName); - Map<Long, List<Long>> resourceIdToTagIds = null; + serviceResource.setId((long)ret.getServiceResources().size()); + ret.getServiceResources().add(serviceResource); - resourceIdToTagIds = new HashMap<Long, List<Long>>(); - List<Long> tagList = new ArrayList<Long>(); + List<Long> tagIds = new ArrayList<Long>(); - if (MapUtils.isNotEmpty(tags)) { - resourceIdToTagIds = new HashMap<Long, List<Long>>(); + if(CollectionUtils.isNotEmpty(tags)) { + for(RangerTag tag : tags) { + tag.setId((long)ret.getTags().size()); + ret.getTags().put(tag.getId(), tag); - for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { - tagList.add(entry.getKey()); + tagIds.add(tag.getId()); } } + ret.getResourceToTagIds().put(serviceResource.getId(), tagIds); - resourceIdToTagIds.put(index, tagList); - - ret = createOrGetServiceTags(serviceTagsMap, serviceName); - - ret.getServiceResources().add(serviceResource); - ret.getTagDefinitions().putAll(tagDefs); - ret.getTags().putAll(tags); - ret.getResourceToTagIds().putAll(resourceIdToTagIds); - + if(CollectionUtils.isNotEmpty(tagDefs)) { + for(RangerTagDef tagDef : tagDefs) { + tagDef.setId((long)ret.getTagDefinitions().size()); + ret.getTagDefinitions().put(tagDef.getId(), tagDef); + } + } } else { - LOG.error("AtlasResourceMapper not found for entity-type:" + entity.getTypeName()); + LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId()); } return ret; @@ -199,67 +208,62 @@ public class AtlasNotificationMapper { return ret; } - static private Map<Long, RangerTag> getTags(AtlasEntityWithTraits entityWithTraits, long index) { - Map<Long, RangerTag> ret = null; - - ret = new HashMap<Long, RangerTag>(); - - List<IStruct> traits = entityWithTraits.getAllTraits(); - - for (IStruct trait : traits) { + static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) { + List<RangerTag> ret = new ArrayList<RangerTag>(); - String traitName = trait.getTypeName(); + if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { + List<IStruct> traits = entityWithTraits.getAllTraits(); - Map<String, String> tagAttrValues = new HashMap<String, String>(); + for (IStruct trait : traits) { + Map<String, String> tagAttrs = new HashMap<String, String>(); - try { + try { + Map<String, Object> attrs = trait.getValuesMap(); - Map<String, Object> attrValues = trait.getValuesMap(); + if(MapUtils.isNotEmpty(attrs)) { + for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) { + String attrName = attrEntry.getKey(); + Object attrValue = attrEntry.getValue(); - for (Map.Entry<String, Object> attrValueEntry : attrValues.entrySet()) { - String attrName = attrValueEntry.getKey(); - Object attrValue = attrValueEntry.getValue(); - try { - String strValue = String.class.cast(attrValue); - tagAttrValues.put(attrName, strValue); - } catch (ClassCastException exception) { - LOG.error("Cannot cast attribute-value to String, skipping... attrName=" + attrName); + tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null); + } } + } catch (AtlasException exception) { + LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); } - } catch (AtlasException exception) { - LOG.error("Could not get values for trait:" + traitName, exception); - } - - RangerTag tag = new RangerTag(); - - tag.setType(traitName); - tag.setAttributes(tagAttrValues); - - ret.put(index++, tag); + ret.add(new RangerTag(trait.getTypeName(), tagAttrs)); + } } return ret; } - static private Map<Long, RangerTagDef> getTagDefs(Map<Long, RangerTag> tags) { + static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) { + List<RangerTagDef> ret = new ArrayList<RangerTagDef>(); - Map<Long, RangerTagDef> ret = null; + if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { + List<IStruct> traits = entityWithTraits.getAllTraits(); - if (MapUtils.isNotEmpty(tags)) { - ret = new HashMap<Long, RangerTagDef>(); + for (IStruct trait : traits) { + RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); - for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { + try { + Map<String, Object> attrs = trait.getValuesMap(); - RangerTagDef tagDef = new RangerTagDef(); - tagDef.setName(entry.getValue().getType()); - tagDef.setId(entry.getKey()); - ret.put(entry.getKey(), tagDef); + if(MapUtils.isNotEmpty(attrs)) { + for (String attrName : attrs.keySet()) { + tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); + } + } + } catch (AtlasException exception) { + LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); + } + ret.add(tagDef); } } return ret; } - } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java new file mode 100644 index 0000000..fd94928 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.Properties; +import java.util.List; +import java.util.Map; + +public abstract class AtlasResourceMapper { + private static final Log LOG = LogFactory.getLog(AtlasResourceMapper.class); + + protected Properties properties; + + public AtlasResourceMapper() { + } + + public void initialize(Properties properties) { + this.properties = properties; + } + + abstract public List<String> getSupportedEntityTypes(); + + abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception; + + + protected String getRangerServiceName(String componentName, String atlasInstanceName) { + String propName = TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName + + ".instance." + atlasInstanceName + + TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX; + + return properties.getProperty(propName); + } + + static protected <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) { + T ret = null; + + try { + Map<String, Object> valueMap = entity.getValuesMap(); + ret = getAttribute(valueMap, name, type); + } catch (AtlasException exception) { + LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception); + } + + return ret; + } + + static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { + return type.cast(map.get(name)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java new file mode 100644 index 0000000..f05d814 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for th + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.ranger.plugin.model.RangerServiceResource; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +public class AtlasResourceMapperUtil { + private static final Log LOG = LogFactory.getLog(AtlasResourceMapperUtil.class); + + private static Map<String, AtlasResourceMapper> atlasResourceMappers = new HashMap<String, AtlasResourceMapper>(); + + public static boolean isEntityTypeHandled(String entityTypeName) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> isEntityTypeHandled(entityTypeName=" + entityTypeName + ")"); + } + + AtlasResourceMapper mapper = atlasResourceMappers.get(entityTypeName); + + boolean ret = mapper != null; + + if (LOG.isDebugEnabled()) { + LOG.debug("<== isEntityTypeHandled(entityTypeName=" + entityTypeName + ") : " + ret); + } + + return ret; + } + + public static RangerServiceResource getRangerServiceResource(IReferenceableInstance atlasEntity) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getRangerServiceResource(" + atlasEntity.getId()._getId() +")"); + } + + RangerServiceResource resource = null; + + AtlasResourceMapper mapper = atlasResourceMappers.get(atlasEntity.getTypeName()); + + if (mapper != null) { + try { + resource = mapper.buildResource(atlasEntity); + } catch (Exception exception) { + LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getId()._getId() + ": ", exception); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getRangerServiceResource(" + atlasEntity.getId()._getId() +"): resource=" + resource); + } + + return resource; + } + + static public boolean initializeAtlasResourceMappers(Properties properties) { + final String MAPPER_NAME_DELIMIER = ","; + + String customMapperNames = TagSyncConfig.getCustomAtlasResourceMappers(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + ")"); + } + boolean ret = true; + + String allResourceMappers = "org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper"; + + if (StringUtils.isNotBlank(customMapperNames)) { + allResourceMappers = allResourceMappers + MAPPER_NAME_DELIMIER + customMapperNames; + } + + String[] mapperNamesArray = allResourceMappers.split(MAPPER_NAME_DELIMIER); + + List<String> mapperNames = Arrays.asList(mapperNamesArray); + + for (String mapperName : mapperNames) { + mapperName = mapperName.trim(); + try { + Class clazz = Class.forName(mapperName); + AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance(); + + resourceMapper.initialize(properties); + + for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) { + add(entityTypeName, resourceMapper); + } + + } catch (Exception exception) { + LOG.error("Failed to create AtlasResourceMapper:" + mapperName + ": ", exception); + ret = false; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + allResourceMappers + "): " + ret); + } + return ret; + } + + private static void add(String entityType, AtlasResourceMapper mapper) { + atlasResourceMappers.put(entityType, mapper); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java new file mode 100644 index 0000000..2499177 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provider; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.entity.EntityNotification; + +import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.ranger.plugin.util.ServiceTags; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class AtlasTagSource extends AbstractTagSource { + private static final Log LOG = LogFactory.getLog(AtlasTagSource.class); + + public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties"; + + public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; + public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; + public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; + + private ConsumerRunnable consumerTask; + + @Override + public boolean initialize(Properties properties) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasTagSource.initialize()"); + } + + Properties atlasProperties = new Properties(); + + boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties); + + if (ret) { + + InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME); + + if (inputStream != null) { + try { + atlasProperties.load(inputStream); + } catch (Exception exception) { + ret = false; + LOG.error("Cannot load Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, exception); + } finally { + try { + inputStream.close(); + } catch (IOException ioException) { + LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException); + } + } + } else { + ret = false; + LOG.error("Cannot find Atlas application properties file"); + } + } + + if (ret) { + if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) { + ret = false; + LOG.error("Value of property '" + TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!"); + } + if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) { + ret = false; + LOG.error("Value of property '" + TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!"); + } + if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) { + ret = false; + LOG.error("Value of property '" + TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!"); + } + } + + if (ret) { + NotificationModule notificationModule = new NotificationModule(); + + Injector injector = Guice.createInjector(notificationModule); + + Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class); + NotificationInterface notification = consumerProvider.get(); + List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + + consumerTask = new ConsumerRunnable(iterators.get(0)); + + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasTagSource.initialize(), result=" + ret); + } + return ret; + } + + @Override + public boolean start() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasTagSource.start()"); + } + Thread consumerThread = null; + if (consumerTask == null) { + LOG.error("No consumerTask!!!"); + } else { + consumerThread = new Thread(consumerTask); + consumerThread.setDaemon(true); + consumerThread.start(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasTagSource.start()"); + } + return consumerThread != null; + } + + @Override + public boolean isChanged() { + return true; + } + + private static String getPrintableEntityNotification(EntityNotification notification) { + StringBuilder sb = new StringBuilder(); + + sb.append("{ Notification-Type: ").append(notification.getOperationType()).append(", "); + AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits()); + sb.append(entityWithTraits.toString()); + sb.append("}"); + return sb.toString(); + } + + private class ConsumerRunnable implements Runnable { + + private final NotificationConsumer<EntityNotification> consumer; + + private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) { + this.consumer = consumer; + } + + private boolean hasNext() { + boolean ret = false; + try { + ret = consumer.hasNext(); + } catch (Exception exception) { + LOG.error("EntityNotification consumer threw exception, IGNORING...:", exception); + } + return ret; + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> ConsumerRunnable.run()"); + } + while (!shutdown) { + if (hasNext()) { + EntityNotification notification = consumer.next(); + if (notification != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notification=" + getPrintableEntityNotification(notification)); + } + + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); + if (serviceTags == null) { + LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification)); + } else { + updateSink(serviceTags); + } + } + } + } + LOG.info("Shutting down the Tag-Atlas-source thread"); + } + } +} +
