Hi, Navina: Thanks for your reply: the files are listed below:
Your help is highly appreciated. Sincerely, Selina ----------------------------The producer.properties for Kafka:---------------------------- ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... metadata.broker.list=localhost:9092 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync # specify the compression codec for all data generated: none, gzip, snappy, lz4. # the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively compression.codec=none # message encoder serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue #queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer #queue.buffering.max.messages= # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full #queue.enqueue.timeout.ms= # the number of messages batched at the producer #batch.num.messages= -----------------------------the server.properties for Kafka-------------------------------------- ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostname routable by clients> # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 # By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ---------------The consumer.properties for kafka--------------- # Zookeeper connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=127.0.0.1:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 #consumer group id group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 On Fri, Jul 24, 2015 at 1:39 PM, Navina Ramesh <nram...@linkedin.com.invalid > wrote: > Hi Selina, > Looks like the attachment got filtered. Can you past the config in the > email or at pastebin? > > Thanks! > Navina > > On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > > > Hi, Yi: > > > > I am wondering if the problem can be fixed by the parameter " > > max.message.size" at kafka.producer.ProducerConfig for the topic size? > > > > My Http Server send message to Kafka. The last message shown on > > console is > > "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari > > postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple > > count=4269" > > > > However the Kafka got Exception from message 4244th > > The error is below and Kafka do not accept any new message after this. > > > > "[2015-07-24 12:46:11,078] WARN > > > [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread], > > Failed to find leader for Set([http-demo,0]) > > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(http-demo)] from broker > [ArrayBuffer(id:0,host:10.1.10.173,port:9092)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > > at > > > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > Caused by: java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 3 more > > [2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation > id > > 21 for topics [Set(http-demo)] from broker > > [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$) > > java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > > at > > > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)" > > > > > > After the Error: > > I show the topic, it is right, but can not show the content by command > line > > > > Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh > > --list --zookeeper localhost:2181 > > http-demo > > Selinas-MacBook-Pro:samza-Demo selina$ > deploy/kafka/bin/kafka-console-consumer.sh > > --zookeeper localhost:2181 --from-beginning --topic http-demo > > [2015-07-24 12:47:38,730] WARN > > > [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87], > > no brokers found when trying to rebalance. > > (kafka.consumer.ZookeeperConsumerConnector) > > > > Attached is my Kafka properties for server and producer. > > > > Your help is highly appreciated > > > > > > Sincerely, > > Selina > > > > > > > > On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > >> Hi, Selina, > >> > >> Your question is not clear. > >> {quote} > >> When the messages was send to Kafka by KafkaProducer, It always failed > >> when the message more than 3000 - 4000 messages. > >> {quote} > >> > >> What's failing? The error stack shows errors on the consumer side and > you > >> were referring to failures to produce to Kafka. Could you be more > specific > >> regarding to what's your failure scenario? > >> > >> -Yi > >> > >> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <swucaree...@gmail.com> > >> wrote: > >> > >> > Hi, > >> > > >> > When the messages was send to Kafka by KafkaProducer, It always > >> failed > >> > when the message more than 3000 - 4000 messages. The error is shown > >> below. > >> > I am wondering if any topic size I need to set at Samza configuration? > >> > > >> > > >> > [2015-07-23 17:30:03,792] WARN > >> > > >> > > >> > [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread], > >> > Failed to find leader for Set([http-demo,0]) > >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > >> > kafka.common.KafkaException: fetching topic metadata for topics > >> > [Set(http-demo)] from broker [ArrayBuffer()] failed > >> > at > >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >> > at > >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > >> > at > >> > > >> > > >> > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > >> > at > >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > >> > ^CConsumed 4327 messages > >> > > >> > Your reply and comment will be highly appreciated. > >> > > >> > > >> > Sincerely, > >> > Selina > >> > > >> > > > > > > > -- > Navina R. >