[
https://issues.apache.org/jira/browse/NIFI-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15023560#comment-15023560
]
ASF GitHub Bot commented on NIFI-1192:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/131#discussion_r45687929
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
---
@@ -203,26 +205,69 @@
return relationships;
}
- @OnScheduled
public void createConsumers(final ProcessContext context) {
final String topic = context.getProperty(TOPIC).getValue();
- final Map<String, Integer> topicCountMap = new HashMap<>(1);
- topicCountMap.put(topic, context.getMaxConcurrentTasks());
-
final Properties props = new Properties();
props.setProperty("zookeeper.connect",
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
props.setProperty("group.id",
context.getProperty(GROUP_ID).getValue());
props.setProperty("client.id",
context.getProperty(CLIENT_NAME).getValue());
props.setProperty("auto.commit.interval.ms",
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
- props.setProperty("auto.commit.enable", "true"); // just be
explicit
props.setProperty("auto.offset.reset",
context.getProperty(AUTO_OFFSET_RESET).getValue());
- props.setProperty("zk.connectiontimeout.ms",
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+ props.setProperty("zookeeper.connection.timeout.ms",
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
props.setProperty("socket.timeout.ms",
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+ for (final Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.isDynamic()) {
+ if (props.containsKey(descriptor.getName())) {
+ this.getLogger().warn("Overriding existing property '"
+ descriptor.getName() + "' which had value of '"
+ +
props.getProperty(descriptor.getName()) + "' with dynamically set value '"
+ + entry.getValue() + "'.");
+ }
+ props.setProperty(descriptor.getName(), entry.getValue());
+ }
+ }
+
+ /*
+ * Unless user sets it to some explicit value we are setting it to
the
+ * lowest possible value of 1 millisecond to ensure the
+ * consumerStream.hasNext() doesn't block. See
+ * http://kafka.apache.org/documentation.html#configuration) as
well as
+ * comment in 'catch ConsumerTimeoutException' in onTrigger() for
more
+ * explanation as to the reasoning behind it.
+ */
+ if (!props.containsKey("consumer.timeout.ms")) {
+ this.getLogger().info("Setting 'consumer.timeout.ms' to 1
milliseconds to avoid consumer"
+ + " block in the event when no events are
present in Kafka topic. If you wish to change this value "
+ + " set it as dynamic property. If you wish to
explicitly enable consumer block (at yoru own risk)"
--- End diff --
type here: says "yoru" instead of "your"
> Rework Get/PutKafka for dynamic properties and improve performance under
> certain conditions
> -------------------------------------------------------------------------------------------
>
> Key: NIFI-1192
> URL: https://issues.apache.org/jira/browse/NIFI-1192
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 0.3.0
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Priority: Critical
> Fix For: 0.4.0
>
>
> Currently Kafka does not honor dynamic properties which means aside from 8
> properties exposed none others could be set
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)