http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java new file mode 100644 index 0000000..243aee5 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import com.google.inject.Guice; +import com.google.inject.Injector; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.atlas.typesystem.EntityImpl; +import org.apache.atlas.typesystem.IdImpl; +import org.apache.atlas.typesystem.TraitImpl; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.entity.EntityNotification; +import org.apache.atlas.notification.entity.EntityNotificationConsumer; +import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider; +import org.apache.atlas.typesystem.api.Entity; +import org.apache.atlas.typesystem.api.Trait; +import org.apache.ranger.admin.client.datatype.RESTResponse; +import org.apache.ranger.tagsync.model.TagSink; +import org.apache.ranger.tagsync.model.TagSource; +import org.apache.ranger.plugin.util.RangerRESTClient; +import org.apache.ranger.plugin.util.RangerRESTUtils; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TagAtlasSource implements TagSource { + private static final Log LOG = LogFactory.getLog(TagAtlasSource.class); + + + private final Map<String, Entity> entities = new LinkedHashMap<>(); + private TagSink tagSink; + private Properties properties; + private ConsumerRunnable consumerTask; + + @Override + public boolean initialize(Properties properties) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAtlasSource.initialize()"); + } + + boolean ret = true; + + if (properties == null || MapUtils.isEmpty(properties)) { + LOG.error("No properties specified for TagFileSource initialization"); + this.properties = new Properties(); + } else { + this.properties = properties; + } + + + NotificationModule notificationModule = new NotificationModule(); + + Injector injector = Guice.createInjector(notificationModule); + + EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class); + + consumerTask = new ConsumerRunnable(consumerProvider.get()); + + //ExecutorService executorService = Executors.newFixedThreadPool(1); + + //executorService.submit(new ConsumerRunnable(consumerProvider.get())); + + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagAtlasSource.initialize(), result=" + ret); + } + return ret; + } + + @Override + public void setTagSink(TagSink sink) { + if (sink == null) { + LOG.error("Sink is null!!!"); + } else { + this.tagSink = sink; + } + } + + @Override + public Thread start() { + Thread consumerThread = null; + if (consumerTask == null) { + LOG.error("No consumerTask!!!"); + } else { + consumerThread = new Thread(consumerTask); + consumerThread.setDaemon(true); + consumerThread.start(); + } + return consumerThread; + } + + @Override + public void updateSink() throws Exception { + } + + @Override + public boolean isChanged() { + return true; + } + + // ----- inner class : ConsumerRunnable ------------------------------------ + + private class ConsumerRunnable implements Runnable { + + private final EntityNotificationConsumer consumer; + + private ConsumerRunnable(EntityNotificationConsumer consumer) { + this.consumer = consumer; + } + + + // ----- Runnable -------------------------------------------------------- + + @Override + public void run() { + while (consumer.hasNext()) { + try { + EntityNotification notification = consumer.next(); + if (notification != null) { + printNotification(notification); + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties); + if (serviceTags == null) { + LOG.error("Failed to map Atlas notification to ServiceTags structure"); + } else { + if (LOG.isDebugEnabled()) { + String serviceTagsJSON = new Gson().toJson(serviceTags); + LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON); + } + + try { + tagSink.uploadServiceTags(serviceTags); + } catch (Exception exception) { + LOG.error("uploadServiceTags() failed..", exception); + } + } + } + } catch(Exception e){ + LOG.error("Exception encountered when processing notification:", e); + } + } + } + + public void printNotification(EntityNotification notification) { + Entity entity = notification.getEntity(); + if (LOG.isDebugEnabled()) { + LOG.debug("Notification-Type: " + notification.getOperationType().name()); + LOG.debug("Entity-Id: " + entity.getId().getGuid()); + LOG.debug("Entity-Type: " + entity.getTypeName()); + + LOG.debug("----------- Entity Values ----------"); + + + for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) { + LOG.debug(" Name:" + entry.getKey()); + Object value = entry.getValue(); + LOG.debug(" Value:" + value); + } + + LOG.debug("----------- Entity Traits ----------"); + + + for (Map.Entry<String, ? extends Trait> entry : entity.getTraits().entrySet()) { + LOG.debug(" Trait-Name:" + entry.getKey()); + Trait trait = entry.getValue(); + LOG.debug(" Trait-Type:" + trait.getTypeName()); + Map<String, Object> traitValues = trait.getValues(); + for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) { + LOG.debug(" Trait-Value-Name:" + valueEntry.getKey()); + LOG.debug(" Trait-Value:" + valueEntry.getValue()); + } + } + + } + } + + } + + public void printAllEntities() { + try { + new AtlasUtility().getAllEntities(); + } + catch(java.io.IOException ioException) { + LOG.error("Caught IOException while retrieving Atlas Entities:", ioException); + } + } + + // update the set of entities with current from Atlas + public void refreshAllEntities() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAtlasSource.refreshAllEntities()"); + } + AtlasUtility atlasUtility = new AtlasUtility(); + + try { + entities.putAll(atlasUtility.getAllEntities()); + } catch (IOException e) { + LOG.error("getAllEntities() failed", e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagAtlasSource.refreshAllEntities()"); + } + } + + // Inner class AtlasUtil + + /** + * Atlas utility. + */ + @SuppressWarnings("unchecked") + private class AtlasUtility { + + /** + * Atlas APIs + */ + public static final String API_ATLAS_TYPES = "api/atlas/types"; + public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type="; + public static final String API_ATLAS_ENTITY = "api/atlas/entities/"; + public static final String API_ATLAS_TYPE = "api/atlas/types/"; + + /** + * API Response Attributes + */ + public static final String RESULTS_ATTRIBUTE = "results"; + public static final String DEFINITION_ATTRIBUTE = "definition"; + public static final String VALUES_ATTRIBUTE = "values"; + public static final String TRAITS_ATTRIBUTE = "traits"; + public static final String TYPE_NAME_ATTRIBUTE = "typeName"; + public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes"; + public static final String SUPER_TYPES_ATTRIBUTE = "superTypes"; + public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions"; + public static final String NAME_ATTRIBUTE = "name"; + + private Type mapType = new TypeToken<Map<String, Object>>(){}.getType(); + + private RangerRESTClient restClient; + + + // ----- Constructors ------------------------------------------------------ + + /** + * Construct an AtlasUtility + * + */ + public AtlasUtility() { + + String url = TagSyncConfig.getAtlasEndpoint(properties); + String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties); + + + if(LOG.isDebugEnabled()) { + LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")"); + } + + restClient = new RangerRESTClient(url, sslConfigFileName); + + if(LOG.isDebugEnabled()) { + LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")"); + } + } + + + // ----- AtlasUtility ------------------------------------------------------ + + /** + * Get all of the entities defined in Atlas. + * + * @return a mapping of GUIDs to Atlas entities + * + * @throws IOException if there is an error communicating with Atlas + */ + public Map<String, Entity> getAllEntities() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAtlasSource.getAllEntities()"); + } + Map<String, Entity> entities = new LinkedHashMap<>(); + + Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES); + + List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class); + + for (String type : types) { + + Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type); + + List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class); + + for (String guid : guids) { + + if (StringUtils.isNotBlank(guid)) { + + Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>(); + + Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); + + if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) { + String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class); + + LOG.info("{"); + LOG.info(" \"entity-id\":" + guid + ","); + LOG.info(" \"entity-definition\":" + definitionJSON); + LOG.info("}"); + + Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType); + + Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class); + Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); + String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class); + + LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")"); + + + Map<String, TraitImpl> traitMap = new HashMap<>(); + + if (MapUtils.isNotEmpty(traits)) { + + LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ "); + + for (Map.Entry<String, Object> entry : traits.entrySet()) { + + Map<String, Object> trait = (Map<String, Object>) entry.getValue(); + + Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); + String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); + + Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); + + TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes); + + traitSuperTypes.put(trait1, superTypes); + + traitMap.put(entry.getKey(), trait1); + + + LOG.info(" Trait(typeName=" + traitTypeName + ")"); + + } + } else { + LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")"); + } + EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap); + + showEntity(entity); + + entities.put(guid, entity); + + } + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAtlasSource.getAllEntities()"); + } + return entities; + } + + + // ----- helper methods ---------------------------------------------------- + + private Map<String, Object> getTraitType(String traitName) + throws IOException { + + Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); + + if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) { + String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class); + + Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType); + + List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); + + if (traitTypes.size() > 0) { + return (Map<String, Object>) traitTypes.get(0); + } + } + return null; + } + + private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) + throws IOException { + + Map<String, TraitImpl> superTypes = new HashMap<>(); + + if (traitType != null) { + + List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); + + for (String superTypeName : superTypeNames) { + + Map<String, Object> superTraitType = getTraitType(superTypeName); + + if (superTraitType != null) { + List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); + + Map<String, Object> superTypeValues = new HashMap<>(); + for (Map<String, Object> attributeDefinition : attributeDefinitions) { + + String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); + if (values.containsKey(attributeName)) { + superTypeValues.put(attributeName, values.get(attributeName)); + } + } + + superTypes.put(superTypeName, + //new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName)); + new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues))); + } + } + } + return superTypes; + } + + /* + private Map<String, Object> atlasAPI(String endpoint) throws IOException { + InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap()); + return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType); + } + */ + + private Map<String, Object> atlasAPI(String endpoint) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint +")"); + } + // Create a REST client and perform a get on it + Map<String, Object> ret = new HashMap<String, Object>(); + + WebResource webResource = restClient.getResource(endpoint); + + ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class); + + if(response != null && response.getStatus() == 200) { + ret = response.getEntity(ret.getClass()); + } else { + LOG.error("Atlas REST call returned with response={" + response +"}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + LOG.error("Error getting Atlas Entity. request=" + webResource.toString() + + ", response=" + resp.toString()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")"); + } + return ret; + } + + private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { + return type.cast(map.get(name)); + } + + + + public void showEntity(Entity entity) { + + LOG.debug("Entity-id :" + entity.getId()); + + LOG.debug("Type: " + entity.getTypeName()); + + LOG.debug("----- Values -----"); + + for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) { + LOG.debug(" Name: " + entry.getKey() + ""); + Object value = entry.getValue(); + LOG.debug(" Value: " + getValue(value, entities.keySet())); + } + + LOG.debug("----- Traits -----"); + + for (String traitName : entity.getTraits().keySet()) { + LOG.debug(" Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName); + } + + } + + public void showTrait(Entity entity, String traitId) { + + String[] traitNames = traitId.split(","); + + Trait trait = entity.getTraits().get(traitNames[0]); + + for (int i = 1; i < traitNames.length; ++i ) { + trait = trait.getSuperTypes().get(traitNames[i]); + } + + String typeName = trait.getTypeName(); + + LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId()); + + LOG.debug("Type: " + typeName); + + LOG.debug("----- Values ------"); + + for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) { + LOG.debug("Name:" + entry.getKey()); + Object value = entry.getValue(); + LOG.debug("Value:" + getValue(value, entities.keySet())); + } + + LOG.debug("Super Traits"); + + + for (String traitName : trait.getSuperTypes().keySet()) { + LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName); + } + } + + // resolve the given value if necessary + private String getValue(Object value, Set<String> ids) { + if (value == null) { + return ""; + } + String idString = getIdValue(value, ids); + if (idString != null) { + return idString; + } + + idString = getIdListValue(value, ids); + if (idString != null) { + return idString; + } + + return value.toString(); + } + // get an id from the given value; return null if the value is not an id type + private String getIdValue(Object value, Set<String> ids) { + if (value instanceof Map) { + Map map = (Map) value; + if (map.size() == 3 && map.containsKey("id")){ + String id = map.get("id").toString(); + if (ids.contains(id)) { + return id; + } + } + } + return null; + } + // get an id list from the given value; return null if the value is not an id list type + private String getIdListValue(Object value, Set<String> ids) { + if (value instanceof List) { + List list = (List) value; + if (list.size() > 0) { + StringBuilder sb = new StringBuilder(); + for (Object o : list) { + String idString = getIdValue(o, ids); + if (idString == null) { + return value.toString(); + } + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(idString); + } + return sb.toString(); + } + } + return null; + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java new file mode 100644 index 0000000..925a712 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.file; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.tagsync.model.TagSink; +import org.apache.ranger.tagsync.model.TagSource; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.io.*; +import java.util.Date; +import java.util.Properties; + +public class TagFileSource implements TagSource, Runnable { + private static final Log LOG = LogFactory.getLog(TagFileSource.class); + + private String sourceFileName; + private long lastModifiedTimeInMillis = 0L; + + private Gson gson; + private TagSink tagSink; + private Properties properties; + + @Override + public boolean initialize(Properties properties) { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagFileSource.initialize()"); + } + + if (properties == null || MapUtils.isEmpty(properties)) { + LOG.error("No properties specified for TagFileSource initialization"); + this.properties = new Properties(); + } else { + this.properties = properties; + } + + boolean ret = true; + + if (ret) { + + sourceFileName = TagSyncConfig.getTagSourceFileName(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("Provided sourceFileName=" + sourceFileName); + } + + String realFileName = TagSyncConfig.getResourceFileName(sourceFileName); + if (realFileName != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Real sourceFileName=" + realFileName); + } + sourceFileName = realFileName; + } else { + LOG.error(sourceFileName + " is not a file or is not readable"); + ret = false; + } + } + + if (ret) { + try { + gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); + } catch (Throwable excp) { + LOG.fatal("failed to create GsonBuilder object", excp); + ret = false; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagFileSource.initialize(): sourceFileName=" + sourceFileName + ", result=" + ret); + } + + return ret; + } + + @Override + public void setTagSink(TagSink sink) { + if (sink == null) { + LOG.error("Sink is null!!!"); + } else { + this.tagSink = sink; + } + } + + @Override + public Thread start() { + + Thread fileMonitoringThread = null; + + fileMonitoringThread = new Thread(this); + fileMonitoringThread.setDaemon(true); + fileMonitoringThread.start(); + + return fileMonitoringThread; + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagFileSource.run()"); + } + long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties); + boolean shutdownFlag = false; + + while (!shutdownFlag) { + + try { + if (isChanged()) { + LOG.info("Begin: update tags from source==>sink"); + if (TagSyncConfig.isTagSyncEnabled(properties)) { + updateSink(); + LOG.info("End: update tags from source==>sink"); + } else { + LOG.info("Tag-sync is not enabled."); + } + } else { + LOG.debug("TagFileSource: no change found for synchronization."); + } + + LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); + + Thread.sleep(sleepTimeBetweenCycleInMillis); + } + catch (InterruptedException e) { + LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e); + shutdownFlag = true; + } + catch (Throwable t) { + LOG.error("tag-sync thread got an error", t); + } + } + + LOG.info("Shutting down the Tag-file-source thread"); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagFileSource.run()"); + } + } + + @Override + public void updateSink() throws Exception { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagFileSource.updateSink()"); + } + ServiceTags serviceTags = readFromFile(); + + if (serviceTags != null) { + tagSink.uploadServiceTags(serviceTags); + } else { + LOG.error("Could not read ServiceTags from file"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagFileSource.updateSink()"); + } + } + + @Override + public boolean isChanged() { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagFileSource.isChanged()"); + } + boolean ret = false; + + long modificationTime = getModificationTime(); + + if (modificationTime > lastModifiedTimeInMillis) { + if (LOG.isDebugEnabled()) { + Date modifiedDate = new Date(modificationTime); + Date lastModifiedDate = new Date(lastModifiedTimeInMillis); + LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate); + } + lastModifiedTimeInMillis = modificationTime; + ret = true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagFileSource.isChanged(): result=" + ret); + } + return ret; + } + + private ServiceTags readFromFile() { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagFileSource.readFromFile(): sourceFileName=" + sourceFileName); + } + + ServiceTags ret = null; + + Reader reader = null; + try { + + reader = new InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName)); + + ret = gson.fromJson(reader, ServiceTags.class); + + } + catch (FileNotFoundException exception) { + LOG.warn("Tag-source file does not exist or not readble '" + sourceFileName + "'"); + } + catch (Exception excp) { + LOG.error("failed to load service-tags from Tag-source file " + sourceFileName, excp); + } + finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception excp) { + LOG.error("error while closing opened Tag-source file " + sourceFileName, excp); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagFileSource.readFromFile(): sourceFileName=" + sourceFileName); + } + + return ret; + } + + private long getModificationTime() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagFileSource.getLastModificationTime(): sourceFileName=" + sourceFileName); + } + long ret = 0L; + + File sourceFile = new File(sourceFileName); + + if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) { + ret = sourceFile.lastModified(); + } else { + ret = new Date().getTime(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagFileSource.lastModificationTime(): sourceFileName=" + sourceFileName + " result=" + new Date(ret)); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/tagsync/src/main/resources/application.properties b/tagsync/src/main/resources/application.properties deleted file mode 100644 index 7c874b6..0000000 --- a/tagsync/src/main/resources/application.properties +++ /dev/null @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# This file is used currently to satisfy needs of Injection of EntityChangeConsumer and its -# initialization. -# -# Basic configuration required to create EntityChangeConsumer -# -atlas.notification.kafka.bootstrap.servers=ranger-tag-policy-akulkarni-1:6667 -atlas.notification.kafka.zookeeper.connect=ranger-tag-policy-akulkarni-1:2181 - -# -# These properties seem to be internal to Atlas. They probably are used for generating notifications. -atlas.notification.embedded=false -atlas.notification.kafka.acks=1 -atlas.notification.kafka.data=${sys:atlas.home}/data/kafka http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/resources/ranger-tagsync-default.xml ---------------------------------------------------------------------- diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml index fabe04e..5f754f9 100644 --- a/tagsync/src/main/resources/ranger-tagsync-default.xml +++ b/tagsync/src/main/resources/ranger-tagsync-default.xml @@ -18,14 +18,6 @@ <configuration> <property> - <name>ranger.tagsync.port</name> - <value>6161</value> - </property> - <property> - <name>ranger.tagsync.ssl</name> - <value>true</value> - </property> - <property> <name>ranger.tagsync.enabled</name> <value>true</value> </property> @@ -34,11 +26,6 @@ <value>./log</value> </property> <property> - <name>ranger.authentication.method</name> - <value>NONE</value> - <description></description> - </property> - <property> <name>ranger.tagsync.tagadmin.rest.url</name> <value>http://localhost:6080</value> <description></description> @@ -49,12 +36,12 @@ <description></description> </property> <property> - <name>ranger.tagsync.policymanager.basicauth.username</name> + <name>ranger.tagsync.tagadmin.basicauth.username</name> <value>admin</value> <description></description> </property> <property> - <name>ranger.tagsync.policymanager.basicauth.password</name> + <name>ranger.tagsync.tagadmin.basicauth.password</name> <value>admin</value> <description></description> </property> @@ -64,28 +51,28 @@ <description></description> </property> <property> - <name>ranger.tagsync.source.file</name> + <name>ranger.tagsync.filesource.filename</name> <value>/etc/ranger/data/tags.json</value> <description></description> </property> <property> <name>ranger.tagsync.source.impl.class</name> - <value>org.apache.ranger.source.file.TagFileSource</value> + <value>file</value> <description></description> </property> <property> <name>ranger.tagsync.sink.impl.class</name> - <value>org.apache.ranger.sink.policymgr.TagRESTSink</value> + <value>tagadmin</value> <description></description> </property> <property> - <name>atlas.endpoint</name> + <name>ranger.tagsync.atlassource.endpoint</name> <value>http://localhost:21000/</value> <description></description> </property> - <property> - <name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name> - <value>cl1_hive</value> - <description></description> - </property> + <property> + <name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name> + <value>cl1_hive</value> + <description></description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java deleted file mode 100644 index e693696..0000000 --- a/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.process; - - -import org.apache.ranger.model.TagSource; -import org.apache.ranger.source.atlas.TagAtlasSource; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.*; -import java.util.Properties; - -import static org.junit.Assert.*; - - -public class TestTagSynchronizer { - - private static TagSynchronizer tagSynchronizer; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - System.out.println("setUpBeforeClass() called"); - - TagSyncConfig config = TagSyncConfig.getInstance(); - - TagSyncConfig.dumpConfiguration(config, new BufferedWriter(new OutputStreamWriter(System.out))); - - Properties props = config.getProperties(); - - tagSynchronizer = new TagSynchronizer(props); - - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - System.out.println("tearDownAfterClass() called"); - - } - - @Test - public void testTagSynchronizer() { - - System.out.println("testTagSynchronizer() called"); - - //tagSynchronizer.run(); - - tagSynchronizer.shutdown("From testTagSynchronizer: time=up"); - - System.out.println("Exiting test"); - - - } - - @Test - public void testTagDownload() { - - boolean initDone = tagSynchronizer.initLoop(); - - System.out.println("TagSynchronizer initialization result=" + initDone); - - /* - TagSource tagSource = tagSynchronizer.getTagSource(); - - try { - TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource; - //tagAtlasSource.printAllEntities(); - } catch (ClassCastException exception) { - System.err.println("TagSource is not of TagAtlasSource"); - } - */ - - System.out.println("Exiting testTagDownload()"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java new file mode 100644 index 0000000..10be4e6 --- /dev/null +++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.process; + + +import org.apache.ranger.tagsync.model.TagSource; +import org.apache.ranger.tagsync.process.TagSyncConfig; +import org.apache.ranger.tagsync.process.TagSynchronizer; +import org.apache.ranger.tagsync.source.atlas.TagAtlasSource; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.util.Properties; + +import static org.junit.Assert.*; + + +public class TestTagSynchronizer { + + private static TagSynchronizer tagSynchronizer; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + System.out.println("setUpBeforeClass() called"); + + TagSyncConfig config = TagSyncConfig.getInstance(); + + TagSyncConfig.dumpConfiguration(config, new BufferedWriter(new OutputStreamWriter(System.out))); + + Properties props = config.getProperties(); + + tagSynchronizer = new TagSynchronizer(props); + + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + System.out.println("tearDownAfterClass() called"); + + } + + @Test + public void testTagSynchronizer() { + + System.out.println("testTagSynchronizer() called"); + + //tagSynchronizer.run(); + + tagSynchronizer.shutdown("From testTagSynchronizer: time=up"); + + System.out.println("Exiting test"); + + + } + + @Test + public void testTagDownload() { + + boolean initDone = tagSynchronizer.initLoop(); + + System.out.println("TagSynchronizer initialization result=" + initDone); + + /* + TagSource tagSource = tagSynchronizer.getTagSource(); + + try { + TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource; + //tagAtlasSource.printAllEntities(); + } catch (ClassCastException exception) { + System.err.println("TagSource is not of TagAtlasSource"); + } + */ + + System.out.println("Exiting testTagDownload()"); + } +}
