This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 67fca68 NIFI-6910 - ReportLineageToAtlas - Added processor properties
for connect timeout and read timeout.
67fca68 is described below
commit 67fca6832a528fc1bfe996f3d95a6ac250f6894c
Author: Tamas Palfy <[email protected]>
AuthorDate: Mon Dec 16 13:29:19 2019 +0100
NIFI-6910 - ReportLineageToAtlas - Added processor properties for connect
timeout and read timeout.
NIFI-6910 - ReportLineageToAtlas - Capitalized Atlas connect and read
timeout property display names.
This closes #3936.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/atlas/reporting/ReportLineageToAtlas.java | 30 ++++-
.../atlas/reporting/TestReportLineageToAtlas.java | 124 ++++++++++++++++++++-
2 files changed, 148 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index fa1b998..80bad7a 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -120,6 +121,24 @@ public class ReportLineageToAtlas extends
AbstractReportingTask {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
+ public static final PropertyDescriptor ATLAS_CONNECT_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("atlas-connect-timeout")
+ .displayName("Atlas Connect Timeout")
+ .description("Max wait time for connection to Atlas.")
+ .required(true)
+ .defaultValue("60 sec")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ATLAS_READ_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("atlas-read-timeout")
+ .displayName("Atlas Read Timeout")
+ .description("Max wait time for response from Atlas.")
+ .required(true)
+ .defaultValue("60 sec")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
static final AllowableValue ATLAS_AUTHN_BASIC = new
AllowableValue("basic", "Basic", "Use username and password.");
static final AllowableValue ATLAS_AUTHN_KERBEROS = new
AllowableValue("kerberos", "Kerberos", "Use Kerberos keytab file.");
static final PropertyDescriptor ATLAS_AUTHN_METHOD = new
PropertyDescriptor.Builder()
@@ -293,6 +312,8 @@ public class ReportLineageToAtlas extends
AbstractReportingTask {
.build();
private static final String ATLAS_PROPERTIES_FILENAME =
"atlas-application.properties";
+ private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS =
"atlas.client.connectTimeoutMSecs";
+ private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS =
"atlas.client.readTimeoutMSecs";
private static final String ATLAS_PROPERTY_CLUSTER_NAME =
"atlas.cluster.name";
private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
@@ -313,6 +334,8 @@ public class ReportLineageToAtlas extends
AbstractReportingTask {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATLAS_URLS);
+ properties.add(ATLAS_CONNECT_TIMEOUT);
+ properties.add(ATLAS_READ_TIMEOUT);
properties.add(ATLAS_AUTHN_METHOD);
properties.add(ATLAS_USER);
properties.add(ATLAS_PASSWORD);
@@ -521,12 +544,17 @@ public class ReportLineageToAtlas extends
AbstractReportingTask {
throw new ProcessException("Default cluster name is not defined.");
}
+ String atlasConnectTimeoutMs =
context.getProperty(ATLAS_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()
+ "";
+ String atlasReadTimeoutMs =
context.getProperty(ATLAS_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()
+ "";
+
atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
atlasAuthN.configure(context);
// Create Atlas configuration file if necessary.
if (createAtlasConf) {
+ atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS,
atlasConnectTimeoutMs);
+ atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS,
atlasReadTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME,
defaultClusterName);
atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS,
String.valueOf(isAtlasApiSecure));
@@ -558,7 +586,7 @@ public class ReportLineageToAtlas extends
AbstractReportingTask {
* In order to avoid authentication expiration issues (i.e. Kerberos
ticket and DelegationToken expiration),
* create Atlas client instance at every onTrigger execution.
*/
- private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
+ protected NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
List<String> urls = new ArrayList<>();
parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
try {
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
index ae1d63d..18a4f37 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
@@ -16,31 +16,70 @@
*/
package org.apache.nifi.atlas.reporting;
+import com.sun.jersey.api.client.Client;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_CONF_CREATE;
+import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_CONF_DIR;
+import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_CONNECT_TIMEOUT;
+import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_DEFAULT_CLUSTER_NAME;
import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_URL;
import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_READ_TIMEOUT;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+import static
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTRAP_SERVERS;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestReportLineageToAtlas {
private final Logger logger =
LoggerFactory.getLogger(TestReportLineageToAtlas.class);
+ private ReportLineageToAtlas testSubject;
+ private MockComponentLog componentLogger;
+ private ReportingInitializationContext initializationContext;
+ private ReportingContext reportingContext;
+
+ @Before
+ public void setUp() throws Exception {
+ testSubject = new ReportLineageToAtlas();
+ componentLogger = new MockComponentLog("reporting-task-id",
testSubject);
+
+ initializationContext = mock(ReportingInitializationContext.class);
+ when(initializationContext.getLogger()).thenReturn(componentLogger);
+ }
+
@Test
public void validateAtlasUrls() throws Exception {
- final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
- final MockProcessContext processContext = new
MockProcessContext(reportingTask);
+ final MockProcessContext processContext = new
MockProcessContext(testSubject);
final MockValidationContext validationContext = new
MockValidationContext(processContext);
processContext.setProperty(ATLAS_NIFI_URL,
"http://nifi.example.com:8080/nifi");
@@ -59,13 +98,13 @@ public class TestReportLineageToAtlas {
};
// Default setting.
- assertResults.accept(reportingTask.validate(validationContext),
+ assertResults.accept(testSubject.validate(validationContext),
r -> assertTrue("Atlas URLs is required", !r.isValid()));
// Invalid URL.
processContext.setProperty(ATLAS_URLS, "invalid");
- assertResults.accept(reportingTask.validate(validationContext),
+ assertResults.accept(testSubject.validate(validationContext),
r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
// Valid URL
@@ -82,8 +121,83 @@ public class TestReportLineageToAtlas {
// Invalid and Valid URLs
processContext.setProperty(ATLAS_URLS, "invalid,
http://atlas2.example.com:21000");
- assertResults.accept(reportingTask.validate(validationContext),
+ assertResults.accept(testSubject.validate(validationContext),
r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
}
+ @Test
+ public void testDefaultConnectAndReadTimeout() throws Exception {
+ // GIVEN
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+
+ // WHEN
+ // THEN
+ testConnectAndReadTimeout(properties, 60000, 60000);
+ }
+
+ @Test
+ public void testSetConnectAndReadTimeout() throws Exception {
+ // GIVEN
+ int expectedConnectTimeoutMs = 10000;
+ int expectedReadTimeoutMs = 5000;
+
+ String atlasConfDir = "target/atlasConfDir";
+ File directory = new File(atlasConfDir);
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(ATLAS_CONNECT_TIMEOUT, (expectedConnectTimeoutMs /
1000) + " sec");
+ properties.put(ATLAS_READ_TIMEOUT, (expectedReadTimeoutMs / 1000) + "
sec");
+
+ properties.put(ATLAS_CONF_DIR, atlasConfDir);
+ properties.put(ATLAS_CONF_CREATE, "true");
+ properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
+ properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:6667");
+
+ // WHEN
+ // THEN
+ testConnectAndReadTimeout(properties, expectedConnectTimeoutMs,
expectedReadTimeoutMs);
+ }
+
+ private void testConnectAndReadTimeout(Map<PropertyDescriptor, String>
properties, Integer expectedConnectTimeout, Integer expectedReadTimeout) throws
Exception {
+ // GIVEN
+ properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
+ properties.put(ATLAS_URLS, "http://localhost:27000");
+ properties.put(ATLAS_USER, "admin");
+ properties.put(ATLAS_PASSWORD, "admin123");
+
+ reportingContext = mock(ReportingContext.class);
+ when(reportingContext.getProperties()).thenReturn(properties);
+ when(reportingContext.getProperty(any())).then(invocation -> new
MockPropertyValue(properties.get(invocation.getArguments()[0])));
+
+ ConfigurationContext configurationContext = new
MockConfigurationContext(properties, null);
+
+ testSubject.initialize(initializationContext);
+ testSubject.setup(configurationContext);
+
+ // WHEN
+ NiFiAtlasClient niFiAtlasClient =
testSubject.createNiFiAtlasClient(reportingContext);
+
+ // THEN
+ Field fieldAtlasClient =
niFiAtlasClient.getClass().getDeclaredField("atlasClient");
+ fieldAtlasClient.setAccessible(true);
+ AtlasClientV2 atlasClient = (AtlasClientV2)
fieldAtlasClient.get(niFiAtlasClient);
+
+ Field fieldAtlasClientContext =
atlasClient.getClass().getSuperclass().getDeclaredField("atlasClientContext");
+ fieldAtlasClientContext.setAccessible(true);
+ Object atlasClientContext = fieldAtlasClientContext.get(atlasClient);
+
+ Method getClient =
atlasClientContext.getClass().getMethod("getClient");
+ getClient.setAccessible(true);
+ Client jerseyClient = (Client) getClient.invoke(atlasClientContext);
+ Map<String, Object> jerseyProperties = jerseyClient.getProperties();
+
+ Integer actualConnectTimeout = (Integer)
jerseyProperties.get("com.sun.jersey.client.property.connectTimeout");
+ Integer actualReadTimeout = (Integer)
jerseyProperties.get("com.sun.jersey.client.property.readTimeout");
+
+ assertEquals(expectedConnectTimeout, actualConnectTimeout);
+ assertEquals(expectedReadTimeout, actualReadTimeout);
+ }
}