Repository: nifi
Updated Branches:
  refs/heads/master a6133d4ce -> 04db806ac


NIFI-2614 This closes #944. added support for max.request.size


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/04db806a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/04db806a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/04db806a

Branch: refs/heads/master
Commit: 04db806aceca433e05b68d2b317e07780f150e43
Parents: a6133d4
Author: Oleg Zhurakousky <[email protected]>
Authored: Thu Aug 25 12:04:51 2016 -0400
Committer: joewitt <[email protected]>
Committed: Thu Aug 25 13:51:08 2016 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/PublishKafka_0_10.java         | 15 ++++++++++-
 .../kafka/pubsub/PublishingContext.java         | 27 +++++---------------
 .../kafka/pubsub/PublishingContextTest.java     | 15 -----------
 .../processors/kafka/pubsub/PublishKafka.java   | 15 ++++++++++-
 .../kafka/pubsub/PublishingContext.java         | 27 +++++---------------
 .../kafka/pubsub/PublishingContextTest.java     | 15 -----------
 6 files changed, 40 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/04db806a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
index e29f2af..3ad2fc6 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -47,6 +47,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
@@ -133,6 +134,15 @@ public class PublishKafka_0_10 extends 
AbstractSessionFactoryProcessor {
             .defaultValue("30 sec")
             .build();
 
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max.request.size")
+            .displayName("Max Request Size")
+            .description("The maximum size of a request in bytes. Corresponds 
to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
     static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
             .name("kafka-key")
             .displayName("Kafka Key")
@@ -207,6 +217,7 @@ public class PublishKafka_0_10 extends 
AbstractSessionFactoryProcessor {
         _descriptors.add(DELIVERY_GUARANTEE);
         _descriptors.add(KEY);
         _descriptors.add(MESSAGE_DEMARCATOR);
+        _descriptors.add(MAX_REQUEST_SIZE);
         _descriptors.add(META_WAIT_TIME);
         _descriptors.add(PARTITION_CLASS);
         _descriptors.add(COMPRESSION_CODEC);
@@ -377,6 +388,7 @@ public class PublishKafka_0_10 extends 
AbstractSessionFactoryProcessor {
         KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ProducerConfig.class, kafkaProps);
         kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProps.put("max.request.size", 
String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
         this.brokers = 
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final Properties props = new Properties();
         props.putAll(kafkaProps);
@@ -461,7 +473,8 @@ public class PublishKafka_0_10 extends 
AbstractSessionFactoryProcessor {
                     
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
         }
 
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex,
+                
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue());
         publishingContext.setKeyBytes(keyBytes);
         publishingContext.setDelimiterBytes(delimiterBytes);
         return publishingContext;

http://git-wip-us.apache.org/repos/asf/nifi/blob/04db806a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
index bda29e6..1513481 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
@@ -31,14 +31,7 @@ class PublishingContext {
 
     private final int lastAckedMessageIndex;
 
-    /*
-     * We're using the default value from Kafka. We are using it to control the
-     * message size before it goes to to Kafka thus limiting possibility of a
-     * late failures in Kafka client.
-     */
-    private int maxRequestSize = 1048576; // kafka default
-
-    private boolean maxRequestSizeSet;
+    private final int maxRequestSize;
 
     private byte[] keyBytes;
 
@@ -49,10 +42,15 @@ class PublishingContext {
     }
 
     PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        this(contentStream, topic, lastAckedMessageIndex, 1048576);
+    }
+
+    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex, int maxRequestSize) {
         this.validateInput(contentStream, topic, lastAckedMessageIndex);
         this.contentStream = contentStream;
         this.topic = topic;
         this.lastAckedMessageIndex = lastAckedMessageIndex;
+        this.maxRequestSize = maxRequestSize;
     }
 
     @Override
@@ -106,19 +104,6 @@ class PublishingContext {
         }
     }
 
-    void setMaxRequestSize(int maxRequestSize) {
-        if (!this.maxRequestSizeSet) {
-            if (maxRequestSize > 0) {
-                this.maxRequestSize = maxRequestSize;
-                this.maxRequestSizeSet = true;
-            } else {
-                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
-            }
-        } else {
-            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
-        }
-    }
-
     private void assertBytesValid(byte[] bytes) {
         if (bytes != null) {
             if (bytes.length == 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04db806a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
index 4a9a1c0..76c29cd 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
@@ -87,20 +87,5 @@ public class PublishingContextTest {
         } catch (IllegalArgumentException e) {
             // success
         }
-
-        publishingContext.setMaxRequestSize(1024);
-        try {
-            publishingContext.setMaxRequestSize(1024);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            publishingContext.setMaxRequestSize(-10);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04db806a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 4745984..65f386e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -47,6 +47,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
@@ -133,6 +134,15 @@ public class PublishKafka extends 
AbstractSessionFactoryProcessor {
             .defaultValue("30 sec")
             .build();
 
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max.request.size")
+            .displayName("Max Request Size")
+            .description("The maximum size of a request in bytes. Corresponds 
to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
     static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
             .name("kafka-key")
             .displayName("Kafka Key")
@@ -207,6 +217,7 @@ public class PublishKafka extends 
AbstractSessionFactoryProcessor {
         _descriptors.add(DELIVERY_GUARANTEE);
         _descriptors.add(KEY);
         _descriptors.add(MESSAGE_DEMARCATOR);
+        _descriptors.add(MAX_REQUEST_SIZE);
         _descriptors.add(META_WAIT_TIME);
         _descriptors.add(PARTITION_CLASS);
         _descriptors.add(COMPRESSION_CODEC);
@@ -377,6 +388,7 @@ public class PublishKafka extends 
AbstractSessionFactoryProcessor {
         KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ProducerConfig.class, kafkaProps);
         kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProps.put("max.request.size", 
String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
         this.brokers = 
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final Properties props = new Properties();
         props.putAll(kafkaProps);
@@ -461,7 +473,8 @@ public class PublishKafka extends 
AbstractSessionFactoryProcessor {
                     
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
         }
 
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex,
+                
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue());
         publishingContext.setKeyBytes(keyBytes);
         publishingContext.setDelimiterBytes(delimiterBytes);
         return publishingContext;

http://git-wip-us.apache.org/repos/asf/nifi/blob/04db806a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
index bda29e6..1513481 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
@@ -31,14 +31,7 @@ class PublishingContext {
 
     private final int lastAckedMessageIndex;
 
-    /*
-     * We're using the default value from Kafka. We are using it to control the
-     * message size before it goes to to Kafka thus limiting possibility of a
-     * late failures in Kafka client.
-     */
-    private int maxRequestSize = 1048576; // kafka default
-
-    private boolean maxRequestSizeSet;
+    private final int maxRequestSize;
 
     private byte[] keyBytes;
 
@@ -49,10 +42,15 @@ class PublishingContext {
     }
 
     PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        this(contentStream, topic, lastAckedMessageIndex, 1048576);
+    }
+
+    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex, int maxRequestSize) {
         this.validateInput(contentStream, topic, lastAckedMessageIndex);
         this.contentStream = contentStream;
         this.topic = topic;
         this.lastAckedMessageIndex = lastAckedMessageIndex;
+        this.maxRequestSize = maxRequestSize;
     }
 
     @Override
@@ -106,19 +104,6 @@ class PublishingContext {
         }
     }
 
-    void setMaxRequestSize(int maxRequestSize) {
-        if (!this.maxRequestSizeSet) {
-            if (maxRequestSize > 0) {
-                this.maxRequestSize = maxRequestSize;
-                this.maxRequestSizeSet = true;
-            } else {
-                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
-            }
-        } else {
-            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
-        }
-    }
-
     private void assertBytesValid(byte[] bytes) {
         if (bytes != null) {
             if (bytes.length == 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04db806a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
index 4a9a1c0..76c29cd 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
@@ -87,20 +87,5 @@ public class PublishingContextTest {
         } catch (IllegalArgumentException e) {
             // success
         }
-
-        publishingContext.setMaxRequestSize(1024);
-        try {
-            publishingContext.setMaxRequestSize(1024);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            publishingContext.setMaxRequestSize(-10);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
     }
 }

Reply via email to