Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2335#discussion_r156982816
--- Diff:
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+ private static NiFiAtlasClient nifiClient;
+ private AtlasClientV2 atlasClient;
+
+ private NiFiAtlasClient() {
+ super();
+ }
+
+ public static NiFiAtlasClient getInstance() {
+ if (nifiClient == null) {
+ synchronized (NiFiAtlasClient.class) {
+ if (nifiClient == null) {
+ nifiClient = new NiFiAtlasClient();
+ }
+ }
+ }
+ return nifiClient;
+ }
+
+ public void initialize(final String[] baseUrls, final AtlasAuthN
authN, final File atlasConfDir) {
+
+ synchronized (NiFiAtlasClient.class) {
+
+ if (atlasClient != null) {
+ logger.info("{} had been setup but replacing it with new
one.", atlasClient);
+ ApplicationProperties.forceReload();
+ }
+
+ if (atlasConfDir != null) {
+ // If atlasConfDir is not set,
atlas-application.properties will be searched under classpath.
+ Properties props = System.getProperties();
+ final String atlasConfProp = "atlas.conf";
+ props.setProperty(atlasConfProp,
atlasConfDir.getAbsolutePath());
+ logger.debug("{} has been set to: {}", atlasConfProp,
props.getProperty(atlasConfProp));
+ }
+
+ atlasClient = authN.createClient(baseUrls);
+
+ }
+ }
+
+ /**
+ * This is an utility method to delete unused types.
+ * Should be used during development or testing only.
+ * @param typeNames to delete
+ */
+ void deleteTypeDefs(String ... typeNames) throws AtlasServiceException
{
+ final AtlasTypesDef existingTypeDef = getTypeDefs(typeNames);
+ try {
+ atlasClient.deleteAtlasTypeDefs(existingTypeDef);
+ } catch (UniformInterfaceException e) {
+ if (e.getResponse().getStatus() == 204) {
+ // 204 is a successful response.
+ // NOTE: However after executing this, Atlas should be
restarted to work properly.
+ logger.info("Deleted type defs: {}", existingTypeDef);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * @return True when required NiFi types are already created.
+ */
+ public boolean isNiFiTypeDefsRegistered() throws AtlasServiceException
{
+ final Set<String> typeNames = ENTITIES.keySet();
+ final Map<String, AtlasEntityDef> existingDefs =
getTypeDefs(typeNames.toArray(new
String[typeNames.size()])).getEntityDefs().stream()
+ .collect(Collectors.toMap(AtlasEntityDef::getName,
Function.identity()));
+ return typeNames.stream().allMatch(existingDefs::containsKey);
+ }
+
+ /**
+ * Create or update NiFi types in Atlas type system.
+ * @param update If false, doesn't perform anything if there is
existing type def for the name.
+ */
+ public void registerNiFiTypeDefs(boolean update) throws
AtlasServiceException {
+ final Set<String> typeNames = ENTITIES.keySet();
+ final Map<String, AtlasEntityDef> existingDefs =
getTypeDefs(typeNames.toArray(new
String[typeNames.size()])).getEntityDefs().stream()
+ .collect(Collectors.toMap(AtlasEntityDef::getName,
Function.identity()));
+
+
+ final AtomicBoolean shouldUpdate = new AtomicBoolean(false);
+
+ final AtlasTypesDef type = new AtlasTypesDef();
+
+ typeNames.stream().filter(typeName -> {
+ final AtlasEntityDef existingDef = existingDefs.get(typeName);
+ if (existingDef != null) {
+ // type is already defined.
+ if (!update) {
+ return false;
+ }
+ shouldUpdate.set(true);
+ }
+ return true;
+ }).forEach(typeName -> {
+ final NiFiTypes.EntityDefinition def = ENTITIES.get(typeName);
+
+ final AtlasEntityDef entity = new AtlasEntityDef();
+ type.getEntityDefs().add(entity);
+
+ entity.setName(typeName);
+
+ Set<String> superTypes = new HashSet<>();
+ List<AtlasAttributeDef> attributes = new ArrayList<>();
+
+ def.define(entity, superTypes, attributes);
+
+ entity.setSuperTypes(superTypes);
+ entity.setAttributeDefs(attributes);
+ });
+
+ // Create or Update.
+ final AtlasTypesDef atlasTypeDefsResult = shouldUpdate.get()
+ ? atlasClient.updateAtlasTypeDefs(type)
+ : atlasClient.createAtlasTypeDefs(type);
+ logger.debug("Result={}", atlasTypeDefsResult);
+ }
+
+ private AtlasTypesDef getTypeDefs(String ... typeNames) throws
AtlasServiceException {
+ final AtlasTypesDef typeDefs = new AtlasTypesDef();
+ for (int i = 0; i < typeNames.length; i++) {
+ final MultivaluedMap<String, String> searchParams = new
MultivaluedMapImpl();
+ searchParams.add(SearchFilter.PARAM_NAME, typeNames[i]);
+ final AtlasTypesDef typeDef = atlasClient.getAllTypeDefs(new
SearchFilter(searchParams));
+ typeDefs.getEntityDefs().addAll(typeDef.getEntityDefs());
+ }
+ logger.debug("typeDefs={}", typeDefs);
+ return typeDefs;
+ }
+
+ private Pattern FLOW_PATH_URL_PATTERN =
Pattern.compile("^http.+processGroupId=([0-9a-z\\-]+).*$");
+ /**
+ * Fetch existing NiFiFlow entity from Atlas.
+ * @param rootProcessGroupId The id of a NiFi flow root process group.
+ * @param clusterName The cluster name of a flow.
+ * @return A NiFiFlow instance filled with retrieved data from Atlas.
Status objects are left blank, e.g. ProcessorStatus.
+ * @throws AtlasServiceException Thrown if requesting to Atlas API
failed, including when the flow is not found.
+ */
+ public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String
clusterName) throws AtlasServiceException {
+
+ final String qualifiedName =
AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId);
+ final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW,
ATTR_QUALIFIED_NAME, qualifiedName);
+ final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt =
searchEntityDef(flowId);
+
+ if (nifiFlowExt == null || nifiFlowExt.getEntity() == null) {
+ return null;
+ }
+
+ final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
+ final Map<String, Object> attributes =
nifiFlowEntity.getAttributes();
+ final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
+ nifiFlow.setExEntity(nifiFlowEntity);
+ nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME)));
+ nifiFlow.setClusterName(clusterName);
+ nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
+ nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));
+
+
nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES))));
+
nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS))));
+
nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS))));
+
+ final Map<String, NiFiFlowPath> flowPaths =
nifiFlow.getFlowPaths();
+ final Map<AtlasObjectId, AtlasEntity> flowPathEntities =
toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS)));
+
+ for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
+ final String pathQualifiedName =
toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME));
+ final NiFiFlowPath flowPath = new
NiFiFlowPath(getComponentIdFromQualifiedName(pathQualifiedName));
+ if (flowPathEntity.hasAttribute(ATTR_URL)) {
+ final Matcher urlMatcher =
FLOW_PATH_URL_PATTERN.matcher(toStr(flowPathEntity.getAttribute(ATTR_URL)));
+ if (urlMatcher.matches()) {
+ flowPath.setGroupId(urlMatcher.group(1));
+ }
+ }
+ flowPath.setExEntity(flowPathEntity);
+
flowPath.setName(toStr(flowPathEntity.getAttribute(ATTR_NAME)));
+
flowPath.getInputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_INPUTS))).keySet());
+
flowPath.getOutputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_OUTPUTS))).keySet());
+ flowPath.startTrackingChanges(nifiFlow);
+
+ flowPaths.put(flowPath.getId(), flowPath);
+ }
+
+ nifiFlow.startTrackingChanges();
+ return nifiFlow;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
+ if (_references == null) {
+ return Collections.emptyList();
+ }
+ List<Map<String, Object>> references = (List<Map<String, Object>>)
_references;
+ return references.stream()
+ .map(ref -> new AtlasObjectId(toStr(ref.get(ATTR_GUID)),
toStr(ref.get(ATTR_TYPENAME)), ref))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * <p>AtlasObjectIds returned from Atlas have GUID, but do not have
qualifiedName, while ones created by the reporting task
+ * do not have GUID, but qualifiedName. AtlasObjectId.equals returns
false for this combination.
+ * In order to match ids correctly, this method converts fetches
actual entities from ids to get qualifiedName attribute.</p>
+ *
+ * <p>Also, AtlasObjectIds returned from Atlas does not have entity
state.
+ * If Atlas is configured to use soft-delete (default), deleted ids
are still returned.
+ * Fetched entities are used to determine whether an AtlasObjectId is
still active or deleted.
+ * Deleted entities will not be included in the result of this method.
+ * </p>
+ * @param ids to convert
+ * @return AtlasObjectIds with qualifiedName
+ */
+ private Map<AtlasObjectId, AtlasEntity>
toQualifiedNameIds(List<AtlasObjectId> ids) {
+ if (ids == null) {
+ return Collections.emptyMap();
+ }
+
+ return ids.stream().distinct().map(id -> {
+ try {
+ final AtlasEntity.AtlasEntityWithExtInfo entityExt =
searchEntityDef(id);
+ final AtlasEntity entity = entityExt.getEntity();
+ if (AtlasEntity.Status.DELETED.equals(entity.getStatus()))
{
+ return null;
+ }
+ final Map<String, Object> uniqueAttrs =
Collections.singletonMap(ATTR_QUALIFIED_NAME,
entity.getAttribute(ATTR_QUALIFIED_NAME));
+ return new Tuple<>(new AtlasObjectId(id.getGuid(),
id.getTypeName(), uniqueAttrs), entity);
+ } catch (AtlasServiceException e) {
+ return null;
--- End diff --
If an Exception is being thrown we should probably be at least logging it,
rather than just swallowing it, no?
---