Hi, Navina:

    Thanks for your reply: the files are listed below:

Your help is highly appreciated.

Sincerely,
Selina

----------------------------The producer.properties for
Kafka:----------------------------


############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the
cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition
spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or
synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy,
lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy,
lz4, respectively
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression
#compressed.topics=

############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.buffering.max.ms=

# the maximum size of the blocking queue for buffering on the producer
#queue.buffering.max.messages=

# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=

# the number of messages batched at the producer
#batch.num.messages=


-----------------------------the server.properties for
Kafka--------------------------------------
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=0

############################# Socket Server Settings
#############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not
set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow
greater
# parallelism for consumption, but this will also result in more files
across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data
dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only
fsync() to sync
# the OS cache lazily. The following configurations control the flush of
data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using
replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation,
and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a
flush
#log.flush.interval.ms=1000

############################# Log Retention Policy
#############################

# The following configurations control the disposal of log segments. The
policy can
# be set to delete segments after a period of time, or after a given size
has accumulated.
# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# By default the log cleaner is disabled and the log retention policy will
default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and
individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000



---------------The consumer.properties for kafka---------------

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000


On Fri, Jul 24, 2015 at 1:39 PM, Navina Ramesh <nram...@linkedin.com.invalid
> wrote:

> Hi Selina,
> Looks like the attachment got filtered. Can you past the config in the
> email or at pastebin?
>
> Thanks!
> Navina
>
> On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
>
> > Hi, Yi:
> >
> >       I am wondering if the problem can be fixed by the parameter  "
> > max.message.size" at kafka.producer.ProducerConfig for the topic size?
> >
> >       My Http Server send message to Kafka. The last message shown on
> > console is
> > "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari
> > postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple
> > count=4269"
> >
> > However the Kafka got Exception from message 4244th
> > The error is below and Kafka do not accept any new message after this.
> >
> > "[2015-07-24 12:46:11,078] WARN
> >
> [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread],
> > Failed to find leader for Set([http-demo,0])
> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(http-demo)] from broker
> [ArrayBuffer(id:0,host:10.1.10.173,port:9092)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > at
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > Caused by: java.nio.channels.ClosedChannelException
> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > ... 3 more
> > [2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation
> id
> > 21 for topics [Set(http-demo)] from broker
> > [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$)
> > java.nio.channels.ClosedChannelException
> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > at
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"
> >
> >
> > After the Error:
> > I show the topic, it is right, but can not show the content by command
> line
> >
> > Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh
> > --list --zookeeper localhost:2181
> > http-demo
> > Selinas-MacBook-Pro:samza-Demo selina$
> deploy/kafka/bin/kafka-console-consumer.sh
> > --zookeeper localhost:2181 --from-beginning --topic http-demo
> > [2015-07-24 12:47:38,730] WARN
> >
> [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87],
> > no brokers found when trying to rebalance.
> > (kafka.consumer.ZookeeperConsumerConnector)
> >
> > Attached is my Kafka properties  for server and producer.
> >
> > Your help is highly appreciated
> >
> >
> > Sincerely,
> > Selina
> >
> >
> >
> > On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >
> >> Hi, Selina,
> >>
> >> Your question is not clear.
> >> {quote}
> >> When the messages was send to Kafka by KafkaProducer, It always failed
> >> when the message more than 3000 - 4000 messages.
> >> {quote}
> >>
> >> What's failing? The error stack shows errors on the consumer side and
> you
> >> were referring to failures to produce to Kafka. Could you be more
> specific
> >> regarding to what's your failure scenario?
> >>
> >> -Yi
> >>
> >> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <swucaree...@gmail.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> >     When the messages was send to Kafka by KafkaProducer, It always
> >> failed
> >> > when the message more than 3000 - 4000 messages. The error is shown
> >> below.
> >> > I am wondering if any topic size I need to set at Samza configuration?
> >> >
> >> >
> >> > [2015-07-23 17:30:03,792] WARN
> >> >
> >> >
> >>
> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread],
> >> > Failed to find leader for Set([http-demo,0])
> >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> >> > kafka.common.KafkaException: fetching topic metadata for topics
> >> > [Set(http-demo)] from broker [ArrayBuffer()] failed
> >> >         at
> >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> >> >         at
> >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> >> >         at
> >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >> > ^CConsumed 4327 messages
> >> >
> >> > Your reply and comment will be highly appreciated.
> >> >
> >> >
> >> > Sincerely,
> >> > Selina
> >> >
> >>
> >
> >
>
>
> --
> Navina R.
>

Reply via email to