object stream2iter extends Iterator[(LogSequenceNumber, String)] {
  override def next: (LogSequenceNumber, String) = {
    try {
      val msg = stream.read
      val offset = msg.arrayOffset
      val src = msg.array
      val len = src.length - offset
      val log = new String(src, offset, len)
      val tmpLSN = stream.getLastReceiveLSN

      (tmpLSN, log)
    } catch {
      case e: Throwable =>
        (LogSequenceNumber.INVALID_LSN, "")

  override def hasNext: Boolean = !stream.isClosed

val srcStream: Source[(LogSequenceNumber, String), NotUsed] = 
Source.fromIterator(() => stream2iter)

val sink: Sink[LogRecord, Future[Done]] = Sink.foreach(log => {
  logger.debug(s"LSN: ${log.lsn}, target: ${log.target.fullname}")
  if (log.lsn != LogSequenceNumber.INVALID_LSN)
    lastReceiveLSN = log.lsn

val totalG: RunnableGraph[NotUsed] = srcStream.via(graph(system)).to(sink)
val res = totalG.run()

// Now, stream.read is too fast(producer is fast, but consumer is slower),so 
can I use back pressure

// in iterator and can someone help me :-)

