Thanks for confirming Cody. To get to use the library, I had to do: val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+ topics + "/0")
It worked well. However, I had to specify the partitionId in the zkPath. If I want the library to pick all the partitions for a topic, without me specifying the path, is it possible out of the box or I need to tweak? regards Sunita On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org> wrote: > You are correct that you shouldn't have to worry about broker id. > > I'm honestly not sure specifically what else you are asking at this point. > > On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: > > Just re-read the kafka architecture. Something that slipped my mind is, > it > > is leader based. So topic/partitionId pair will be same on all the > brokers. > > So we do not need to consider brokerid while storing offsets. Still > > exploring rest of the items. > > regards > > Sunita > > > > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com> > > wrote: > >> > >> Hello Experts, > >> > >> I am trying to use the saving to ZK design. Just saw Sudhir's comments > >> that it is old approach. Any reasons for that? Any issues observed with > >> saving to ZK. The way we are planning to use it is: > >> 1. Following > >> http://aseigneurin.github.io/2016/05/07/spark-kafka- > achieving-zero-data-loss.html > >> 2. Saving to the same file with offsetRange as a part of the file. We > hope > >> that there are no partial writes/ overwriting is possible and > offsetRanges > >> > >> However I have below doubts which I couldnt figure out from the code > here > >> - > >> https://github.com/ippontech/spark-kafka-source/blob/ > master/src/main/scala/com/ippontech/kafka/stores/ > ZooKeeperOffsetsStore.scala > >> 1. The brokerId is not part of the OffsetRange. How will just the > >> partitionId:FromOffset stay unique in a cluster with multiple brokers > and > >> multiple partitions/topic. > >> 2. Do we have to specify zkPath to include the partitionid. I tried > using > >> the ZookeeperOffsetStore as is and it required me to specify the > >> partitionId: > >> > >> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), > >> "/consumers/topics/"+ topics + "/0") > >> > >> For our usecases it is too limiting to include partitionId in the path. > >> To get it to work by automatically detecting the existing partitions > for a > >> given topic, I changed it as below (inspired from > >> http://www.programcreek.com/java-api-examples/index.php? > api=kafka.utils.ZKGroupTopicDirs): > >> > >> /** > >> * zkServers Zookeeper server string: host1:port1[,host2:port2,...] > >> * groupID consumer group to get offsets for > >> * topic topic to get offsets for > >> * return - mapping of (topic and) partition to offset > >> */ > >> private def getOffsets(groupID :String, topic: String):Option[String] = > { > >> val topicDirs = new ZKGroupTopicDirs(groupID, topic) > >> val offsets = new mutable.HashMap[TopicAndPartition,Long]() > >> val topicSeq = List(topic).toSeq > >> // try { > >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq) > >> var partition:Object=null > >> for (partition <- partitions) { > >> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + > "/" + > >> partition; > >> val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull( > zkClient, > >> partitionOffsetPath)._1; > >> val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong > >> else 0L; > >> val topicAndPartition:TopicAndPartition = new > >> TopicAndPartition(topic, Integer.parseInt(partition.toString)); > >> offsets.put(topicAndPartition, offset) > >> } > >> //} > >> Option(offsets.mkString(",")) > >> } > >> > >> // Read the previously saved offsets from Zookeeper > >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = { > >> > >> LogHandler.log.info("Reading offsets from ZooKeeper") > >> > >> val offsetsRangesStrOpt = getOffsets(consumerGrp,topic) > >> val start = System.currentTimeMillis() > >> offsetsRangesStrOpt match { > >> case Some(offsetsRangesStr) => > >> LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}") > >> > >> val offsets = offsetsRangesStr.split(",") > >> .map(s => s.split(":")) > >> .map { case Array(partitionStr, offsetStr) => > >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) } > >> .toMap > >> > >> LogHandler.log.info("Done reading offsets from ZooKeeper. Took " > + > >> (System.currentTimeMillis() - start)) > >> > >> Some(offsets) > >> case None => > >> LogHandler.log.info("No offsets found in ZooKeeper. Took " + > >> (System.currentTimeMillis() - start)) > >> None > >> } > >> > >> } > >> > >> However, I am concerned if the saveOffsets will work well with this > >> approach. Thats when I realized we are not considering brokerIds which > >> storing offsets and probably the OffsetRanges does not have it either. > It > >> can only provide Topic, partition, from and until offsets. > >> > >> I am probably missing something very basic. Probably the library works > >> well by itself. Can someone/ Cody explain? > >> > >> Cody, Thanks a lot for sharing your work. > >> > >> regards > >> Sunita > >> > >> > >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org> > >> wrote: > >>> > >>> See > >>> https://github.com/koeninger/kafka-exactly-once > >>> > >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" > >>> <mdkhajaasm...@gmail.com> wrote: > >>>> > >>>> Hi Experts, > >>>> > >>>> I am looking for some information on how to acheive zero data loss > while > >>>> working with kafka and Spark. I have searched online and blogs have > >>>> different answer. Please let me know if anyone has idea on this. > >>>> > >>>> Blog 1: > >>>> > >>>> https://databricks.com/blog/2015/01/15/improved-driver- > fault-tolerance-and-zero-data-loss-in-spark-streaming.html > >>>> > >>>> > >>>> Blog2: > >>>> > >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka- > achieving-zero-data-loss.html > >>>> > >>>> > >>>> Blog one simply says configuration change with checkpoint directory > and > >>>> blog 2 give details about on how to save offsets to zoo keeper. can > you > >>>> please help me out with right approach. > >>>> > >>>> Thanks, > >>>> Asmath > >>>> > >>>> > >> > > >