Hi, All: Do you think it could be caused by memory, virtual memory size?
Sincerely, Selina On Fri, Jul 24, 2015 at 1:54 PM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, Navina: > > Thanks for your reply: the files are listed below: > > Your help is highly appreciated. > > Sincerely, > Selina > > ----------------------------The producer.properties for > Kafka:---------------------------- > > > ############################# Producer Basics ############################# > > # list of brokers used for bootstrapping knowledge about the rest of the > cluster > # format: host1:port1,host2:port2 ... > metadata.broker.list=localhost:9092 > > # name of the partitioner class for partitioning events; default partition > spreads data randomly > #partitioner.class= > > # specifies whether the messages are sent asynchronously (async) or > synchronously (sync) > producer.type=sync > > # specify the compression codec for all data generated: none, gzip, > snappy, lz4. > # the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, > lz4, respectively > compression.codec=none > > # message encoder > serializer.class=kafka.serializer.DefaultEncoder > > # allow topic level compression > #compressed.topics= > > ############################# Async Producer ############################# > # maximum time, in milliseconds, for buffering data on the producer queue > #queue.buffering.max.ms= > > # the maximum size of the blocking queue for buffering on the producer > #queue.buffering.max.messages= > > # Timeout for event enqueue: > # 0: events will be enqueued immediately or dropped if the queue is full > # -ve: enqueue will block indefinitely if the queue is full > # +ve: enqueue will block up to this many milliseconds if the queue is full > #queue.enqueue.timeout.ms= > > # the number of messages batched at the producer > #batch.num.messages= > > > -----------------------------the server.properties for > Kafka-------------------------------------- > ############################# Server Basics ############################# > > # The id of the broker. This must be set to a unique integer for each > broker. > broker.id=0 > > ############################# Socket Server Settings > ############################# > > # The port the socket server listens on > port=9092 > > # Hostname the broker will bind to. If not set, the server will bind to > all interfaces > #host.name=localhost > > # Hostname the broker will advertise to producers and consumers. If not > set, it uses the > # value for "host.name" if configured. Otherwise, it will use the value > returned from > # java.net.InetAddress.getCanonicalHostName(). > #advertised.host.name=<hostname routable by clients> > > # The port to publish to ZooKeeper for clients to use. If this is not set, > # it will publish the same port that the broker binds to. > #advertised.port=<port accessible by clients> > > # The number of threads handling network requests > num.network.threads=3 > > # The number of threads doing disk I/O > num.io.threads=8 > > # The send buffer (SO_SNDBUF) used by the socket server > socket.send.buffer.bytes=102400 > > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=102400 > > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > > > ############################# Log Basics ############################# > > # A comma seperated list of directories under which to store log files > log.dirs=/tmp/kafka-logs > > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files > across > # the brokers. > num.partitions=1 > > # The number of threads per data directory to be used for log recovery at > startup and flushing at shutdown. > # This value is recommended to be increased for installations with data > dirs located in RAID array. > num.recovery.threads.per.data.dir=1 > > ############################# Log Flush Policy > ############################# > > # Messages are immediately written to the filesystem but by default we > only fsync() to sync > # the OS cache lazily. The following configurations control the flush of > data to disk. > # There are a few important trade-offs here: > # 1. Durability: Unflushed data may be lost if you are not using > replication. > # 2. Latency: Very large flush intervals may lead to latency spikes > when the flush does occur as there will be a lot of data to flush. > # 3. Throughput: The flush is generally the most expensive operation, > and a small flush interval may lead to exceessive seeks. > # The settings below allow one to configure the flush policy to flush data > after a period of time or > # every N messages (or both). This can be done globally and overridden on > a per-topic basis. > > # The number of messages to accept before forcing a flush of data to disk > #log.flush.interval.messages=10000 > > # The maximum amount of time a message can sit in a log before we force a > flush > #log.flush.interval.ms=1000 > > ############################# Log Retention Policy > ############################# > > # The following configurations control the disposal of log segments. The > policy can > # be set to delete segments after a period of time, or after a given size > has accumulated. > # A segment will be deleted whenever *either* of these criteria are met. > Deletion always happens > # from the end of the log. > > # The minimum age of a log file to be eligible for deletion > log.retention.hours=168 > > # A size-based retention policy for logs. Segments are pruned from the log > as long as the remaining > # segments don't drop below log.retention.bytes. > #log.retention.bytes=1073741824 > > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > log.segment.bytes=1073741824 > > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=300000 > > # By default the log cleaner is disabled and the log retention policy will > default to just delete segments after their retention expires. > # If log.cleaner.enable=true is set the cleaner will be enabled and > individual logs can then be marked for log compaction. > log.cleaner.enable=false > > ############################# Zookeeper ############################# > > # Zookeeper connection string (see zookeeper docs for details). > # This is a comma separated host:port pairs, each corresponding to a zk > # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". > # You can also append an optional chroot string to the urls to specify the > # root directory for all kafka znodes. > zookeeper.connect=localhost:2181 > > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > > > > ---------------The consumer.properties for kafka--------------- > > # Zookeeper connection string > # comma separated host:port pairs, each corresponding to a zk > # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" > zookeeper.connect=127.0.0.1:2181 > > # timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > > #consumer group id > group.id=test-consumer-group > > #consumer timeout > #consumer.timeout.ms=5000 > > > On Fri, Jul 24, 2015 at 1:39 PM, Navina Ramesh < > nram...@linkedin.com.invalid> wrote: > >> Hi Selina, >> Looks like the attachment got filtered. Can you past the config in the >> email or at pastebin? >> >> Thanks! >> Navina >> >> On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com> >> wrote: >> >> > Hi, Yi: >> > >> > I am wondering if the problem can be fixed by the parameter " >> > max.message.size" at kafka.producer.ProducerConfig for the topic size? >> > >> > My Http Server send message to Kafka. The last message shown on >> > console is >> > "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari >> > postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple >> >> > count=4269" >> > >> > However the Kafka got Exception from message 4244th >> > The error is below and Kafka do not accept any new message after this. >> > >> > "[2015-07-24 12:46:11,078] WARN >> > >> [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread], >> > Failed to find leader for Set([http-demo,0]) >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) >> > kafka.common.KafkaException: fetching topic metadata for topics >> > [Set(http-demo)] from broker >> [ArrayBuffer(id:0,host:10.1.10.173,port:9092)] >> > failed >> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >> > at >> > >> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >> > Caused by: java.nio.channels.ClosedChannelException >> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >> > at >> > >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >> > ... 3 more >> > [2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation >> id >> > 21 for topics [Set(http-demo)] from broker >> > [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$) >> > java.nio.channels.ClosedChannelException >> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >> > at >> > >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >> > at >> > >> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)" >> > >> > >> > After the Error: >> > I show the topic, it is right, but can not show the content by command >> line >> > >> > Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh >> > --list --zookeeper localhost:2181 >> > http-demo >> > Selinas-MacBook-Pro:samza-Demo selina$ >> deploy/kafka/bin/kafka-console-consumer.sh >> > --zookeeper localhost:2181 --from-beginning --topic http-demo >> > [2015-07-24 12:47:38,730] WARN >> > >> [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87], >> > no brokers found when trying to rebalance. >> > (kafka.consumer.ZookeeperConsumerConnector) >> > >> > Attached is my Kafka properties for server and producer. >> > >> > Your help is highly appreciated >> > >> > >> > Sincerely, >> > Selina >> > >> > >> > >> > On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com> wrote: >> > >> >> Hi, Selina, >> >> >> >> Your question is not clear. >> >> {quote} >> >> When the messages was send to Kafka by KafkaProducer, It always failed >> >> when the message more than 3000 - 4000 messages. >> >> {quote} >> >> >> >> What's failing? The error stack shows errors on the consumer side and >> you >> >> were referring to failures to produce to Kafka. Could you be more >> specific >> >> regarding to what's your failure scenario? >> >> >> >> -Yi >> >> >> >> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <swucaree...@gmail.com> >> >> >> wrote: >> >> >> >> > Hi, >> >> > >> >> > When the messages was send to Kafka by KafkaProducer, It always >> >> failed >> >> > when the message more than 3000 - 4000 messages. The error is shown >> >> below. >> >> > I am wondering if any topic size I need to set at Samza >> configuration? >> >> > >> >> > >> >> > [2015-07-23 17:30:03,792] WARN >> >> > >> >> > >> >> >> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread], >> >> > Failed to find leader for Set([http-demo,0]) >> >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) >> >> > kafka.common.KafkaException: fetching topic metadata for topics >> >> > [Set(http-demo)] from broker [ArrayBuffer()] failed >> >> > at >> >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >> >> > at >> >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >> >> > at >> >> > >> >> > >> >> >> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >> >> > at >> >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >> >> > ^CConsumed 4327 messages >> >> > >> >> > Your reply and comment will be highly appreciated. >> >> > >> >> > >> >> > Sincerely, >> >> > Selina >> >> > >> >> >> > >> > >> >> >> -- >> Navina R. >> > >