Hello,Every Hacker:
 I am a junior software engineer. I tried to rewrite Stream as a sink and 
found that when I read a large update of the database, I throw an exception 
OutOfMemory. Can anyone give me some advice? Why am I wrong, thanks :-)

def run(url: String, user: String, password: String, replUser: String, 
replPass: String): Unit = {
  implicit val system: ActorSystem = ActorSystem("log-analyzer")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val ec: ExecutionContextExecutor = system.dispatcher

  class DeadLetterListener extends Actor {
    def receive: PartialFunction[Any, Unit] = {
      case dl: DeadLetter ⇒
        logger.error("dead letter: {}", dl)

  val listener = system.actorOf(Props[DeadLetterListener])
  system.eventStream.subscribe(listener, classOf[DeadLetter])


  // check last confirmed lsn
  val sqlConnection: Connection = this.getSqlConnection(url, user, password)
  val lastLSNOpt: Option[LogSequenceNumber] = 

  var stream: PGReplicationStream = null
  var replConnection: PGConnection = this.getReplicationConnection(url, 
replUser, replPass)

  if (lastLSNOpt.isEmpty) {
    logger.info(s"未发现slot信息,初始化slot: $SLOT_NAME")
  } else {
    logger.info(s"从 ${lastLSNOpt.get} 开始")

  // start stream flow process
  //    val counter = new AtomicInteger(0)

  var lastTs = System.currentTimeMillis()
  var lastReceiveLSN = LogSequenceNumber.INVALID_LSN

  while (true) {
    try {
      if (replConnection == null || 
replConnection.asInstanceOf[Connection].isClosed) {
        replConnection = this.getReplicationConnection(url, replUser, replPass)

      stream = replConnection
        .withSlotOption("include-xids", false)
        //          .withSlotOption("skip-empty-xacts", true) 
        .start() // start replication stream

      object stream2iter extends Iterator[(LogSequenceNumber, String)] {
        override def next: (LogSequenceNumber, String) = {
          try {
            //outofmemory when big update in database
            val msg = stream.read
            val offset = msg.arrayOffset
            val src = msg.array
            val len = src.length - offset
            val log = new String(src, offset, len)
            val tmpLSN = stream.getLastReceiveLSN

            (tmpLSN, log)
          } catch {
            case e: Throwable =>
              println("localhost: " + e)
              (LogSequenceNumber.INVALID_LSN, "")

        override def hasNext: Boolean = !stream.isClosed

      val srcStream: Source[(LogSequenceNumber, String), NotUsed] = 
Source.fromIterator(() => stream2iter)

      val sink: Sink[LogRecord, Future[Done]] = Sink.foreach(log => {
        logger.debug(s"LSN: ${log.lsn}, target: ${log.target.fullname}")
        if (log.lsn != LogSequenceNumber.INVALID_LSN)
          lastReceiveLSN = log.lsn


      val totalG: RunnableGraph[NotUsed] = srcStream.via(graph(system)).to(sink)
      val res = totalG.run()
    } catch {
      // 断连常态化
      case e: PSQLException =>
        logger.warn("Connection closed, retrying...", e)
        val conn = replConnection.asInstanceOf[Connection]
        if (!conn.isClosed) {
        replConnection = null
      case e: Throwable =>
        logger.error("Connection can not be established!", e)
        shutdown(system, e)


