This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 14e6dc3 NIFI-8405: Added debug logging around how long it takes to
establish connections/query dns/read and write headers and body when
replication requests; added additional timing around Ranger audits and
authorizations and monitoring of long-running tasks because those run often and
frequently show up in the logs at the same time as the long requests
14e6dc3 is described below
commit 14e6dc3dc6533fc6189a301cf720c0ee57650729
Author: Mark Payne <[email protected]>
AuthorDate: Thu Apr 8 13:54:50 2021 -0400
NIFI-8405: Added debug logging around how long it takes to establish
connections/query dns/read and write headers and body when replication
requests; added additional timing around Ranger audits and authorizations and
monitoring of long-running tasks because those run often and frequently show up
in the logs at the same time as the long requests
This closes #4983
Signed-off-by: David Handermann <[email protected]>
---
.../http/replication/okhttp/CallEventListener.java | 161 +++++++++++++++++++
.../okhttp/OkHttpReplicationClient.java | 48 +++---
.../okhttp/RequestReplicationEventListener.java | 174 +++++++++++++++++++++
.../components/monitor/LongRunningTaskMonitor.java | 5 +-
.../components/LongRunningTaskMonitorTest.java | 29 ++--
.../ranger/authorization/RangerNiFiAuthorizer.java | 9 +-
6 files changed, 384 insertions(+), 42 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
new file mode 100644
index 0000000..2ca8596
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+
+import okhttp3.Call;
+
+import java.net.SocketAddress;
+import java.text.NumberFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CallEventListener {
+ private final Call call;
+ private final Map<String, Timing> dnsTimings = new HashMap<>();
+ private final Map<String, Timing> establishConnectionTiming = new
HashMap<>();
+ private long callStart;
+ private long callEnd;
+ private long responseBodyStart;
+ private long responseBodyEnd;
+ private long responseHeaderStart;
+ private long responseHeaderEnd;
+ private long requestHeaderStart;
+ private long requestHeaderEnd;
+ private long requestBodyStart;
+ private long requestBodyEnd;
+ private long secureConnectStart;
+ private long secureConnectEnd;
+
+
+ public CallEventListener(final Call call) {
+ this.call = call;
+ }
+
+ public void callStart() {
+ callStart = System.nanoTime();
+ }
+
+ public void callEnd() {
+ callEnd = System.nanoTime();
+ }
+
+ public void dnsStart(final String domainName) {
+ dnsTimings.computeIfAbsent(domainName, k -> new
Timing(domainName)).start();
+ }
+
+ public void dnsEnd(final String domainName) {
+ dnsTimings.computeIfAbsent(domainName, k -> new
Timing(domainName)).end();
+ }
+
+ public void responseBodyStart() {
+ responseBodyStart = System.nanoTime();
+ }
+
+ public void responseBodyEnd() {
+ responseBodyEnd = System.nanoTime();
+ }
+
+ public void responseHeaderStart() {
+ responseHeaderStart = System.nanoTime();
+ }
+
+ public void responseHeaderEnd() {
+ responseHeaderEnd = System.nanoTime();
+ }
+
+ public void requestHeaderStart() {
+ requestHeaderStart = System.nanoTime();
+ }
+
+ public void requestHeaderEnd() {
+ requestHeaderEnd = System.nanoTime();
+ }
+
+ public void requestBodyStart() {
+ requestBodyStart = System.nanoTime();
+ }
+
+ public void requestBodyEnd() {
+ requestBodyEnd = System.nanoTime();
+ }
+
+ public void connectStart(final SocketAddress address) {
+ establishConnectionTiming.computeIfAbsent(address.toString(),
Timing::new).start();
+ }
+
+ public void connectionAcquired(final SocketAddress address) {
+ establishConnectionTiming.computeIfAbsent(address.toString(),
Timing::new).end();
+ }
+
+ public void secureConnectStart() {
+ secureConnectStart = System.nanoTime();
+ }
+
+ public void secureConnectEnd() {
+ secureConnectEnd = System.nanoTime();
+ }
+
+ public Call getCall() {
+ return call;
+ }
+
+ @Override
+ public String toString() {
+ final NumberFormat numberFormat = NumberFormat.getInstance();
+
+ return "CallEventListener{" +
+ "url=" + call.request().url() +
+ ", dnsTimings=" + dnsTimings.values() +
+ ", establishConnectionTiming=" +
establishConnectionTiming.values() +
+ ", tlsInitialization=" + numberFormat.format(secureConnectEnd -
secureConnectStart) + " nanos" +
+ ", writeRequestHeaders=" + numberFormat.format(requestHeaderEnd -
requestHeaderStart) + " nanos" +
+ ", writeRequestBody=" + numberFormat.format(requestBodyEnd -
requestBodyStart) + " nanos" +
+ ", readResponseHeaders=" + numberFormat.format(responseHeaderEnd -
responseHeaderStart) + " nanos" +
+ ", readResponseBody=" + numberFormat.format(responseBodyEnd -
responseBodyStart) + " nanos" +
+ ", callTime=" + numberFormat.format(callEnd - callStart) + "
nanos" +
+ '}';
+ }
+
+ private static class Timing {
+ private final String address;
+ private long start;
+ private long nanos;
+
+ public Timing(final String address) {
+ this.address = address;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void start() {
+ start = System.nanoTime();
+ }
+
+ public void end() {
+ if (start > 0) {
+ nanos += (System.nanoTime() - start);
+ }
+ }
+
+ public String toString() {
+ return "{address=" + address + ", nanos=" +
NumberFormat.getInstance().format(nanos) + "}";
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
index 72d0ffb..cb53c22 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
@@ -21,29 +21,6 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonInclude.Value;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.zip.GZIPInputStream;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
import okhttp3.Call;
import okhttp3.ConnectionPool;
import okhttp3.Headers;
@@ -66,6 +43,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StreamUtils;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
+
public class OkHttpReplicationClient implements HttpReplicationClient {
private static final Logger logger =
LoggerFactory.getLogger(OkHttpReplicationClient.class);
private static final Set<String> gzipEncodings = Stream.of("gzip",
"x-gzip").collect(Collectors.toSet());
@@ -317,6 +318,7 @@ public class OkHttpReplicationClient implements
HttpReplicationClient {
okHttpClientBuilder.followRedirects(true);
final int connectionPoolSize =
properties.getClusterNodeMaxConcurrentRequests();
okHttpClientBuilder.connectionPool(new
ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
+ okHttpClientBuilder.eventListener(new
RequestReplicationEventListener());
// Apply the TLS configuration, if present
try {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
new file mode 100644
index 0000000..209a10b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+
+import okhttp3.Call;
+import okhttp3.Connection;
+import okhttp3.EventListener;
+import okhttp3.Handshake;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RequestReplicationEventListener extends EventListener {
+ private static final Logger logger =
LoggerFactory.getLogger(RequestReplicationEventListener.class);
+
+ private final ConcurrentMap<Call, CallEventListener> eventListeners = new
ConcurrentHashMap<>();
+
+ private CallEventListener getListener(final Call call) {
+ return eventListeners.computeIfAbsent(call, CallEventListener::new);
+ }
+
+ @Override
+ public void dnsStart(@NotNull final Call call, @NotNull final String
domainName) {
+ super.dnsStart(call, domainName);
+ getListener(call).dnsStart(domainName);
+ }
+
+ @Override
+ public void dnsEnd(@NotNull final Call call, @NotNull final String
domainName, @NotNull final List<InetAddress> inetAddressList) {
+ super.dnsEnd(call, domainName, inetAddressList);
+ getListener(call).dnsEnd(domainName);
+ }
+
+ @Override
+ public void callStart(@NotNull final Call call) {
+ super.callStart(call);
+ getListener(call).callStart();
+ }
+
+ @Override
+ public void callEnd(@NotNull final Call call) {
+ super.callEnd(call);
+ final CallEventListener callListener = getListener(call);
+ callListener.callEnd();
+
+ logTimingInfo(callListener);
+ eventListeners.remove(call);
+ }
+
+ @Override
+ public void callFailed(@NotNull final Call call, @NotNull final
IOException ioe) {
+ super.callFailed(call, ioe);
+
+ final CallEventListener callListener = getListener(call);
+ callListener.callEnd();
+
+ logTimingInfo(callListener);
+ eventListeners.remove(call);
+ }
+
+ @Override
+ public void responseBodyStart(@NotNull final Call call) {
+ super.responseBodyStart(call);
+ getListener(call).responseBodyStart();
+ }
+
+ @Override
+ public void responseBodyEnd(@NotNull final Call call, final long
byteCount) {
+ super.responseBodyEnd(call, byteCount);
+ getListener(call).responseBodyEnd();
+ }
+
+ @Override
+ public void responseFailed(@NotNull final Call call, @NotNull final
IOException ioe) {
+ super.responseFailed(call, ioe);
+ getListener(call).responseBodyEnd();
+ }
+
+ @Override
+ public void responseHeadersStart(@NotNull final Call call) {
+ super.responseHeadersStart(call);
+ getListener(call).responseHeaderStart();
+ }
+
+ @Override
+ public void responseHeadersEnd(@NotNull final Call call, @NotNull final
Response response) {
+ super.responseHeadersEnd(call, response);
+ getListener(call).responseHeaderEnd();
+ }
+
+ @Override
+ public void requestHeadersStart(@NotNull final Call call) {
+ super.requestHeadersStart(call);
+ getListener(call).requestHeaderStart();
+ }
+
+ @Override
+ public void requestHeadersEnd(@NotNull final Call call, @NotNull final
Request request) {
+ super.requestHeadersEnd(call, request);
+ getListener(call).requestHeaderEnd();
+ }
+
+ @Override
+ public void requestBodyStart(@NotNull final Call call) {
+ super.requestBodyStart(call);
+ getListener(call).requestBodyStart();
+ }
+
+ @Override
+ public void requestBodyEnd(@NotNull final Call call, final long byteCount)
{
+ super.requestBodyEnd(call, byteCount);
+ getListener(call).requestBodyEnd();
+ }
+
+ @Override
+ public void requestFailed(@NotNull final Call call, @NotNull final
IOException ioe) {
+ super.requestFailed(call, ioe);
+ getListener(call).requestBodyEnd();
+ }
+
+ @Override
+ public void connectStart(@NotNull final Call call, @NotNull final
InetSocketAddress inetSocketAddress, @NotNull final Proxy proxy) {
+ super.connectStart(call, inetSocketAddress, proxy);
+ getListener(call).connectStart(inetSocketAddress);
+ }
+
+ @Override
+ public void connectionAcquired(@NotNull final Call call, @NotNull final
Connection connection) {
+ super.connectionAcquired(call, connection);
+
getListener(call).connectionAcquired(connection.socket().getRemoteSocketAddress());
+ }
+
+ @Override
+ public void secureConnectStart(@NotNull final Call call) {
+ super.secureConnectStart(call);
+ getListener(call).secureConnectStart();
+ }
+
+ @Override
+ public void secureConnectEnd(@NotNull final Call call, @Nullable final
Handshake handshake) {
+ super.secureConnectEnd(call, handshake);
+ getListener(call).secureConnectEnd();
+ }
+
+ private void logTimingInfo(final CallEventListener eventListener) {
+ logger.debug("Timing information {}", eventListener);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
index a8368ae..fc03d91 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -26,6 +26,7 @@ import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.NumberFormat;
import java.util.List;
public class LongRunningTaskMonitor implements Runnable {
@@ -45,6 +46,7 @@ public class LongRunningTaskMonitor implements Runnable {
@Override
public void run() {
getLogger().debug("Checking long running processor tasks...");
+ final long start = System.nanoTime();
int activeThreadCount = 0;
int longRunningThreadCount = 0;
@@ -73,7 +75,8 @@ public class LongRunningTaskMonitor implements Runnable {
}
}
- getLogger().info("Active threads: {}; Long running threads: {}",
activeThreadCount, longRunningThreadCount);
+ final long nanos = System.nanoTime() - start;
+ getLogger().info("Active threads: {}; Long running threads: {}; time
to check: {} nanos", activeThreadCount, longRunningThreadCount,
NumberFormat.getInstance().format(nanos));
}
@VisibleForTesting
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/LongRunningTaskMonitorTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/LongRunningTaskMonitorTest.java
index 78d26f3..9477978 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/LongRunningTaskMonitorTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/LongRunningTaskMonitorTest.java
@@ -32,11 +32,11 @@ import org.slf4j.Logger;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class LongRunningTaskMonitorTest {
@@ -45,7 +45,6 @@ public class LongRunningTaskMonitorTest {
@Test
public void test() {
- // GIVEN
ThreadDetails threadDetails = mock(ThreadDetails.class);
ActiveThreadInfo activeThreadInfo11 =
mockActiveThreadInfo("Thread-11", 60_000);
@@ -82,12 +81,8 @@ public class LongRunningTaskMonitorTest {
}
};
- // WHEN
longRunningTaskMonitor.run();
- // THEN
- verify(longRunningTaskMonitorLogger).debug("Checking long running
processor tasks...");
-
ArgumentCaptor<String> logMessages =
ArgumentCaptor.forClass(String.class);
verify(longRunningTaskMonitorLogger,
times(2)).warn(logMessages.capture());
assertEquals("Long running task detected on processor
[id=Processor-1-ID, name=Processor-1-Name, type=Processor-1-Type]. Task time:
60 seconds. Stack trace:\n" + STACKTRACE,
@@ -97,18 +92,18 @@ public class LongRunningTaskMonitorTest {
ArgumentCaptor<String> controllerBulletinMessages =
ArgumentCaptor.forClass(String.class);
verify(eventReporter, times(2)).reportEvent(eq(Severity.WARNING),
eq("Long Running Task"), controllerBulletinMessages.capture());
- assertEquals("Processor with ID Processor-1-ID, Name Processor-1-Name
and Type Processor-1-Type has a task that has been running for 60 seconds
(thread name: Thread-12).",
- controllerBulletinMessages.getAllValues().get(0));
- assertEquals("Processor with ID Processor-2-ID, Name Processor-2-Name
and Type Processor-2-Type has a task that has been running for 1,000 seconds
(thread name: Thread-21).",
- controllerBulletinMessages.getAllValues().get(1));
-
- verify(processorLogger1).warn("The processor has a task that has been
running for 60 seconds (thread name: Thread-12).");
-
- verify(processorLogger2).warn("The processor has a task that has been
running for 1,000 seconds (thread name: Thread-21).");
-
- verify(longRunningTaskMonitorLogger).info("Active threads: {}; Long
running threads: {}", 4, 2);
- verifyNoMoreInteractions(longRunningTaskMonitorLogger, eventReporter,
processorLogger1, processorLogger2);
+ final String firstBulletinMessage =
controllerBulletinMessages.getAllValues().get(0);
+ assertTrue(firstBulletinMessage.contains("Processor-1-ID"));
+ assertTrue(firstBulletinMessage.contains("Processor-1-Type"));
+ assertTrue(firstBulletinMessage.contains("Processor-1-Name"));
+ assertTrue(firstBulletinMessage.contains("Thread-12"));
+
+ final String secondBulletinMessage =
controllerBulletinMessages.getAllValues().get(1);
+ assertTrue(secondBulletinMessage.contains("Processor-2-ID"));
+ assertTrue(secondBulletinMessage.contains("Processor-2-Type"));
+ assertTrue(secondBulletinMessage.contains("Processor-2-Name"));
+ assertTrue(secondBulletinMessage.contains("Thread-21"));
}
private ActiveThreadInfo mockActiveThreadInfo(String threadName, long
activeMillis) {
diff --git
a/nifi-nar-bundles/nifi-ranger-bundle/nifi-ranger-plugin/src/main/java/org/apache/nifi/ranger/authorization/RangerNiFiAuthorizer.java
b/nifi-nar-bundles/nifi-ranger-bundle/nifi-ranger-plugin/src/main/java/org/apache/nifi/ranger/authorization/RangerNiFiAuthorizer.java
index 4428f38..93c956d 100644
---
a/nifi-nar-bundles/nifi-ranger-bundle/nifi-ranger-plugin/src/main/java/org/apache/nifi/ranger/authorization/RangerNiFiAuthorizer.java
+++
b/nifi-nar-bundles/nifi-ranger-bundle/nifi-ranger-plugin/src/main/java/org/apache/nifi/ranger/authorization/RangerNiFiAuthorizer.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.MalformedURLException;
+import java.text.NumberFormat;
import java.util.Date;
import java.util.Map;
import java.util.Set;
@@ -55,7 +56,6 @@ import java.util.WeakHashMap;
* Authorizer implementation that uses Apache Ranger to make authorization
decisions.
*/
public class RangerNiFiAuthorizer implements Authorizer, AuthorizationAuditor {
-
private static final Logger logger =
LoggerFactory.getLogger(RangerNiFiAuthorizer.class);
static final String RANGER_AUDIT_PATH_PROP = "Ranger Audit Config Path";
@@ -79,6 +79,7 @@ public class RangerNiFiAuthorizer implements Authorizer,
AuthorizationAuditor {
private volatile String rangerAdminIdentity = null;
private volatile boolean rangerKerberosEnabled = false;
private volatile NiFiProperties nifiProperties;
+ private final NumberFormat numberFormat = NumberFormat.getInstance();
@Override
public void initialize(AuthorizerInitializationContext
initializationContext) throws AuthorizerCreationException {
@@ -177,7 +178,10 @@ public class RangerNiFiAuthorizer implements Authorizer,
AuthorizationAuditor {
rangerRequest.setClientIPAddress(clientIp);
}
+ final long authStart = System.nanoTime();
final RangerAccessResult result =
nifiPlugin.isAccessAllowed(rangerRequest);
+ final long authNanos = System.nanoTime() - authStart;
+ logger.debug("Performed authorization against Ranger for Resource ID
{}, Identity {} in {} nanos", resourceIdentifier, identity,
numberFormat.format(authNanos));
// store the result for auditing purposes later if appropriate
if (request.isAccessAttempt()) {
@@ -223,7 +227,10 @@ public class RangerNiFiAuthorizer implements Authorizer,
AuthorizationAuditor {
event.setResourceType(RANGER_NIFI_RESOURCE_NAME);
event.setResourcePath(request.getRequestedResource().getIdentifier());
+ final long start = System.nanoTime();
defaultAuditHandler.logAuthzAudit(event);
+ final long nanos = System.nanoTime() - start;
+ logger.debug("Logged authorization audits to Ranger in {} nanos",
numberFormat.format(nanos));
}
}