[ 
https://issues.apache.org/jira/browse/NIFI-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15023560#comment-15023560
 ] 

ASF GitHub Bot commented on NIFI-1192:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/131#discussion_r45687929
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
    @@ -203,26 +205,69 @@
             return relationships;
         }
     
    -    @OnScheduled
         public void createConsumers(final ProcessContext context) {
             final String topic = context.getProperty(TOPIC).getValue();
     
    -        final Map<String, Integer> topicCountMap = new HashMap<>(1);
    -        topicCountMap.put(topic, context.getMaxConcurrentTasks());
    -
             final Properties props = new Properties();
             props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
             props.setProperty("group.id", 
context.getProperty(GROUP_ID).getValue());
             props.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
             props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
    -        props.setProperty("auto.commit.enable", "true"); // just be 
explicit
             props.setProperty("auto.offset.reset", 
context.getProperty(AUTO_OFFSET_RESET).getValue());
    -        props.setProperty("zk.connectiontimeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
    +        props.setProperty("zookeeper.connection.timeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
             props.setProperty("socket.timeout.ms", 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
     
    +        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                if (props.containsKey(descriptor.getName())) {
    +                    this.getLogger().warn("Overriding existing property '" 
+ descriptor.getName() + "' which had value of '"
    +                                    + 
props.getProperty(descriptor.getName()) + "' with dynamically set value '"
    +                                    + entry.getValue() + "'.");
    +                }
    +                props.setProperty(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        /*
    +         * Unless user sets it to some explicit value we are setting it to 
the
    +         * lowest possible value of 1 millisecond to ensure the
    +         * consumerStream.hasNext() doesn't block. See
    +         * http://kafka.apache.org/documentation.html#configuration) as 
well as
    +         * comment in 'catch ConsumerTimeoutException' in onTrigger() for 
more
    +         * explanation as to the reasoning behind it.
    +         */
    +        if (!props.containsKey("consumer.timeout.ms")) {
    +            this.getLogger().info("Setting 'consumer.timeout.ms' to 1 
milliseconds to avoid consumer"
    +                            + " block in the event when no events are 
present in Kafka topic. If you wish to change this value "
    +                            + " set it as dynamic property. If you wish to 
explicitly enable consumer block (at yoru own risk)"
    --- End diff --
    
    type here: says "yoru" instead of "your"


> Rework Get/PutKafka for dynamic properties and improve performance under 
> certain conditions
> -------------------------------------------------------------------------------------------
>
>                 Key: NIFI-1192
>                 URL: https://issues.apache.org/jira/browse/NIFI-1192
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 0.3.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>            Priority: Critical
>             Fix For: 0.4.0
>
>
> Currently Kafka does not honor dynamic properties which means aside from 8 
> properties exposed none others could be set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to