Hi Siddu,

The consumer supports a configuration option "metadata.max.age.ms" which
basically controls how often topic metadata is fetched. By default, this is
set fairly high (5 minutes), which means it will take up to 5 minutes to
discover new topics matching your regular expression. You can set this
lower to discover topics quicker. If you wait longer than the configured
max age, and still the topic hasn't been discovered, then you may have
found a bug and we'd welcome a JIRA with the details.

Thanks,
Jason

On Wed, Mar 23, 2016 at 11:02 PM, Siddu Shebannavar <
siddushebanna...@fico.com> wrote:

>       Hi Team,
>
>
>
>
> I'm using KafkaConsumer to consume messages from Kafka server (topics)..
> *         It works fine for topics created before starting Consumer code...
> *         But the problem is, it will not work if the topics created
> dynamically(i mean to say after consumer code started), but the API says it
> will support dynamic topic creation..
>
>
>
> Kafka version used : 0.9.0.1
>
> Here is the link for your reference..
>
>
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
>
>
> Here is the JAVA code...
>
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "localhost:9092");
>     props.put("group.id", "test");
>     props.put("enable.auto.commit", "false");
>     props.put("auto.commit.interval.ms", "1000");
>     props.put("session.timeout.ms", "30000");
>
> props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
>
> props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
>
>     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
>     Pattern r = Pattern.compile("siddu(\\d)*");
>
>     consumer.subscribe(r, new HandleRebalance());
>     try {
>          while(true) {
>              ConsumerRecords<String, String> records =
> consumer.poll(Long.MAX_VALUE);
>              for (TopicPartition partition : records.partitions()) {
>                  List<ConsumerRecord<String, String>> partitionRecords =
> records.records(partition);
>                  for (ConsumerRecord<String, String> record :
> partitionRecords) {
>                      System.out.println(partition.partition()  + ": "
> +record.offset() + ": " + record.value());
>                  }
>                  long lastOffset =
> partitionRecords.get(partitionRecords.size() - 1).offset();
>
>                  consumer.commitSync(Collections.singletonMap(partition,
> new OffsetAndMetadata(lastOffset + 1)));
>              }
>          }
>      } finally {
>        consumer.close();
>      }
>
>
> NOTE: My topic names are matching the Regular Expression.. And if i
> restart the consumer then it will start reading messages pushed to topic...
> Any help is really appreciated...
> Thanks,
> Siddu
> FICO Bangalore.
> Ph : +91 - 9845234534
>
>
> This email and any files transmitted with it are confidential, proprietary
> and intended solely for the individual or entity to whom they are
> addressed. If you have received this email in error please delete it
> immediately.
>

Reply via email to