This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 67fca68  NIFI-6910 - ReportLineageToAtlas - Added processor properties 
for connect timeout and read timeout.
67fca68 is described below

commit 67fca6832a528fc1bfe996f3d95a6ac250f6894c
Author: Tamas Palfy <[email protected]>
AuthorDate: Mon Dec 16 13:29:19 2019 +0100

    NIFI-6910 - ReportLineageToAtlas - Added processor properties for connect 
timeout and read timeout.
    
    NIFI-6910 - ReportLineageToAtlas - Capitalized Atlas connect and read 
timeout property display names.
    
    This closes #3936.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi/atlas/reporting/ReportLineageToAtlas.java |  30 ++++-
 .../atlas/reporting/TestReportLineageToAtlas.java  | 124 ++++++++++++++++++++-
 2 files changed, 148 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index fa1b998..80bad7a 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
 
@@ -120,6 +121,24 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor ATLAS_CONNECT_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("atlas-connect-timeout")
+            .displayName("Atlas Connect Timeout")
+            .description("Max wait time for connection to Atlas.")
+            .required(true)
+            .defaultValue("60 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ATLAS_READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("atlas-read-timeout")
+            .displayName("Atlas Read Timeout")
+            .description("Max wait time for response from Atlas.")
+            .required(true)
+            .defaultValue("60 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final AllowableValue ATLAS_AUTHN_BASIC = new 
AllowableValue("basic", "Basic", "Use username and password.");
     static final AllowableValue ATLAS_AUTHN_KERBEROS = new 
AllowableValue("kerberos", "Kerberos", "Use Kerberos keytab file.");
     static final PropertyDescriptor ATLAS_AUTHN_METHOD = new 
PropertyDescriptor.Builder()
@@ -293,6 +312,8 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             .build();
 
     private static final String ATLAS_PROPERTIES_FILENAME = 
"atlas-application.properties";
+    private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = 
"atlas.client.connectTimeoutMSecs";
+    private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = 
"atlas.client.readTimeoutMSecs";
     private static final String ATLAS_PROPERTY_CLUSTER_NAME = 
"atlas.cluster.name";
     private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
     private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
@@ -313,6 +334,8 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ATLAS_URLS);
+        properties.add(ATLAS_CONNECT_TIMEOUT);
+        properties.add(ATLAS_READ_TIMEOUT);
         properties.add(ATLAS_AUTHN_METHOD);
         properties.add(ATLAS_USER);
         properties.add(ATLAS_PASSWORD);
@@ -521,12 +544,17 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             throw new ProcessException("Default cluster name is not defined.");
         }
 
+        String atlasConnectTimeoutMs = 
context.getProperty(ATLAS_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()
 + "";
+        String atlasReadTimeoutMs = 
context.getProperty(ATLAS_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()
 + "";
+
         atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
         atlasAuthN.configure(context);
 
         // Create Atlas configuration file if necessary.
         if (createAtlasConf) {
 
+            atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, 
atlasConnectTimeoutMs);
+            atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, 
atlasReadTimeoutMs);
             atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, 
defaultClusterName);
             atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, 
String.valueOf(isAtlasApiSecure));
 
@@ -558,7 +586,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
      * In order to avoid authentication expiration issues (i.e. Kerberos 
ticket and DelegationToken expiration),
      * create Atlas client instance at every onTrigger execution.
      */
-    private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
+    protected NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
         List<String> urls = new ArrayList<>();
         parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
         try {
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
index ae1d63d..18a4f37 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
@@ -16,31 +16,70 @@
  */
 package org.apache.nifi.atlas.reporting;
 
+import com.sun.jersey.api.client.Client;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockPropertyValue;
 import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_CONF_CREATE;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_CONF_DIR;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_CONNECT_TIMEOUT;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_DEFAULT_CLUSTER_NAME;
 import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_URL;
 import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_READ_TIMEOUT;
 import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
 import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTRAP_SERVERS;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestReportLineageToAtlas {
 
     private final Logger logger = 
LoggerFactory.getLogger(TestReportLineageToAtlas.class);
 
+    private ReportLineageToAtlas testSubject;
+    private MockComponentLog componentLogger;
+    private ReportingInitializationContext initializationContext;
+    private ReportingContext reportingContext;
+
+    @Before
+    public void setUp() throws Exception {
+        testSubject = new ReportLineageToAtlas();
+        componentLogger = new MockComponentLog("reporting-task-id", 
testSubject);
+
+        initializationContext = mock(ReportingInitializationContext.class);
+        when(initializationContext.getLogger()).thenReturn(componentLogger);
+    }
+
     @Test
     public void validateAtlasUrls() throws Exception {
-        final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
-        final MockProcessContext processContext = new 
MockProcessContext(reportingTask);
+        final MockProcessContext processContext = new 
MockProcessContext(testSubject);
         final MockValidationContext validationContext = new 
MockValidationContext(processContext);
 
         processContext.setProperty(ATLAS_NIFI_URL, 
"http://nifi.example.com:8080/nifi";);
@@ -59,13 +98,13 @@ public class TestReportLineageToAtlas {
         };
 
         // Default setting.
-        assertResults.accept(reportingTask.validate(validationContext),
+        assertResults.accept(testSubject.validate(validationContext),
                 r -> assertTrue("Atlas URLs is required", !r.isValid()));
 
 
         // Invalid URL.
         processContext.setProperty(ATLAS_URLS, "invalid");
-        assertResults.accept(reportingTask.validate(validationContext),
+        assertResults.accept(testSubject.validate(validationContext),
                 r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
 
         // Valid URL
@@ -82,8 +121,83 @@ public class TestReportLineageToAtlas {
 
         // Invalid and Valid URLs
         processContext.setProperty(ATLAS_URLS, "invalid, 
http://atlas2.example.com:21000";);
-        assertResults.accept(reportingTask.validate(validationContext),
+        assertResults.accept(testSubject.validate(validationContext),
                 r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
     }
 
+    @Test
+    public void testDefaultConnectAndReadTimeout() throws Exception {
+        // GIVEN
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+
+        // WHEN
+        // THEN
+        testConnectAndReadTimeout(properties, 60000, 60000);
+    }
+
+    @Test
+    public void testSetConnectAndReadTimeout() throws Exception {
+        // GIVEN
+        int expectedConnectTimeoutMs = 10000;
+        int expectedReadTimeoutMs = 5000;
+
+        String atlasConfDir = "target/atlasConfDir";
+        File directory = new File(atlasConfDir);
+        if (!directory.exists()) {
+            directory.mkdirs();
+        }
+
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(ATLAS_CONNECT_TIMEOUT, (expectedConnectTimeoutMs / 
1000) + " sec");
+        properties.put(ATLAS_READ_TIMEOUT, (expectedReadTimeoutMs / 1000) + " 
sec");
+
+        properties.put(ATLAS_CONF_DIR, atlasConfDir);
+        properties.put(ATLAS_CONF_CREATE, "true");
+        properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
+        properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:6667";);
+
+        // WHEN
+        // THEN
+        testConnectAndReadTimeout(properties, expectedConnectTimeoutMs, 
expectedReadTimeoutMs);
+    }
+
+    private void testConnectAndReadTimeout(Map<PropertyDescriptor, String> 
properties, Integer expectedConnectTimeout, Integer expectedReadTimeout) throws 
Exception {
+        // GIVEN
+        properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi";);
+        properties.put(ATLAS_URLS, "http://localhost:27000";);
+        properties.put(ATLAS_USER, "admin");
+        properties.put(ATLAS_PASSWORD, "admin123");
+
+        reportingContext = mock(ReportingContext.class);
+        when(reportingContext.getProperties()).thenReturn(properties);
+        when(reportingContext.getProperty(any())).then(invocation -> new 
MockPropertyValue(properties.get(invocation.getArguments()[0])));
+
+        ConfigurationContext configurationContext = new 
MockConfigurationContext(properties, null);
+
+        testSubject.initialize(initializationContext);
+        testSubject.setup(configurationContext);
+
+        // WHEN
+        NiFiAtlasClient niFiAtlasClient = 
testSubject.createNiFiAtlasClient(reportingContext);
+
+        // THEN
+        Field fieldAtlasClient = 
niFiAtlasClient.getClass().getDeclaredField("atlasClient");
+        fieldAtlasClient.setAccessible(true);
+        AtlasClientV2 atlasClient = (AtlasClientV2) 
fieldAtlasClient.get(niFiAtlasClient);
+
+        Field fieldAtlasClientContext = 
atlasClient.getClass().getSuperclass().getDeclaredField("atlasClientContext");
+        fieldAtlasClientContext.setAccessible(true);
+        Object atlasClientContext = fieldAtlasClientContext.get(atlasClient);
+
+        Method getClient = 
atlasClientContext.getClass().getMethod("getClient");
+        getClient.setAccessible(true);
+        Client jerseyClient = (Client) getClient.invoke(atlasClientContext);
+        Map<String, Object> jerseyProperties = jerseyClient.getProperties();
+
+        Integer actualConnectTimeout = (Integer) 
jerseyProperties.get("com.sun.jersey.client.property.connectTimeout");
+        Integer actualReadTimeout = (Integer) 
jerseyProperties.get("com.sun.jersey.client.property.readTimeout");
+
+        assertEquals(expectedConnectTimeout, actualConnectTimeout);
+        assertEquals(expectedReadTimeout, actualReadTimeout);
+    }
 }

Reply via email to