Repository: nifi
Updated Branches:
  refs/heads/master 7aabb99bc -> 6471f66ac


NIFI-3402 - Added etag support to InvokeHTTP

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2150.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6471f66a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6471f66a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6471f66a

Branch: refs/heads/master
Commit: 6471f66acd763794f42c1bf968c7e53f04058e76
Parents: 7aabb99
Author: m-hogue <hogu...@gmail.com>
Authored: Tue Sep 12 14:51:19 2017 -0400
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Tue Mar 13 10:00:11 2018 +0100

----------------------------------------------------------------------
 .../nifi/processors/standard/InvokeHTTP.java    | 55 +++++++++++++++++++-
 .../standard/util/TestInvokeHttpCommon.java     |  1 +
 2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6471f66a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 5f6e66c..0f0f878 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -20,6 +20,8 @@ import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
 import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
 import com.burgstaller.okhttp.digest.CachingAuthenticator;
 import com.burgstaller.okhttp.digest.DigestAuthenticator;
+import com.google.common.io.Files;
+import okhttp3.Cache;
 import okhttp3.Credentials;
 import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
@@ -48,6 +50,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -70,6 +73,8 @@ import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
+
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -388,6 +393,24 @@ public final class InvokeHTTP extends AbstractProcessor {
             .allowableValues("true", "false")
             .build();
 
+    public static final PropertyDescriptor PROP_USE_ETAG = new 
PropertyDescriptor.Builder()
+            .name("use-etag")
+            .description("Enable HTTP entity tag (ETag) support for HTTP 
requests.")
+            .displayName("Use HTTP ETag")
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor PROP_ETAG_MAX_CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("etag-max-cache-size")
+            .description("The maximum size that the ETag cache should be 
allowed to grow to. The default size is 10MB.")
+            .displayName("Maximum ETag Cache Size")
+            .required(true)
+            .defaultValue("10MB")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
     public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
             PROP_METHOD,
             PROP_URL,
@@ -413,7 +436,9 @@ public final class InvokeHTTP extends AbstractProcessor {
             PROP_CONTENT_TYPE,
             PROP_SEND_BODY,
             PROP_USE_CHUNKED_ENCODING,
-            PROP_PENALIZE_NO_RETRY));
+            PROP_PENALIZE_NO_RETRY,
+            PROP_USE_ETAG,
+            PROP_ETAG_MAX_CACHE_SIZE));
 
     // relationships
     public static final Relationship REL_SUCCESS_REQ = new 
Relationship.Builder()
@@ -559,6 +584,13 @@ public final class InvokeHTTP extends AbstractProcessor {
             isHttpsProxy = HTTPS.equals(proxyType);
         }
 
+        // configure ETag cache if enabled
+        final boolean etagEnabled = 
context.getProperty(PROP_USE_ETAG).asBoolean();
+        if(etagEnabled) {
+            final int maxCacheSizeBytes = 
context.getProperty(PROP_ETAG_MAX_CACHE_SIZE).asDataSize(DataUnit.B).intValue();
+            okHttpClientBuilder.cache(new Cache(getETagCacheDir(), 
maxCacheSizeBytes));
+        }
+
         // Set timeouts
         
okHttpClientBuilder.connectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()),
 TimeUnit.MILLISECONDS);
         
okHttpClientBuilder.readTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
 TimeUnit.MILLISECONDS);
@@ -718,6 +750,14 @@ public final class InvokeHTTP extends AbstractProcessor {
         final int maxAttributeSize = 
context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
         final ComponentLog logger = getLogger();
 
+        // log ETag cache metrics
+        final boolean eTagEnabled = 
context.getProperty(PROP_USE_ETAG).asBoolean();
+        if(eTagEnabled && logger.isDebugEnabled()) {
+            final Cache cache = okHttpClient.cache();
+            logger.debug("OkHttp ETag cache metrics :: Request Count: {} | 
Network Count: {} | Hit Count: {}",
+                    new Object[] {cache.requestCount(), cache.networkCount(), 
cache.hitCount()});
+        }
+
         // Every request/response cycle has a unique transaction id which will 
be stored as a flowfile attribute.
         final UUID txId = UUID.randomUUID();
 
@@ -1142,6 +1182,19 @@ public final class InvokeHTTP extends AbstractProcessor {
         return contentType != null ? 
contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8;
     }
 
+    /**
+     * Retrieve the directory in which OkHttp should cache responses. This 
method opts
+     * to use a temp directory to write the cache, which means that the cache 
will be written
+     * to a new location each time this processor is scheduled.
+     *
+     * Ref: https://github.com/square/okhttp/wiki/Recipes#response-caching
+     *
+     * @return the directory in which the ETag cache should be written
+     */
+    private static File getETagCacheDir() {
+        return Files.createTempDir();
+    }
+
     private static class OverrideHostnameVerifier implements HostnameVerifier {
 
         private final String trustedHostname;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6471f66a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
index 6b233d9..5242b8f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
@@ -1547,6 +1547,7 @@ public abstract class TestInvokeHttpCommon {
 
                 response.setContentType("text/plain");
                 response.setContentLength(target.length());
+                response.setHeader("Cache-Control", "public,max-age=1");
 
                 try (PrintWriter writer = response.getWriter()) {
                     writer.print(target);

Reply via email to