Hey guys,

I am new to akka streams so I may be missing something big here. Using 
akka-stream, akka-http-core, akka-typed-experimental all 2.4.10 - scala 
2.11.8.

I am experiencing the `connection reset by peer` error on a stream that I 
have checked with `curl` to remain open far longer than the time it takes 
akka to crash.

The stream is produced from json lines like so: 

```

def stream(): Source[Development, Any] = {

  def chunkConsumer(res: HttpResponse) = {
    res.entity.dataBytes
      .via(Framing.delimiter(ByteString("\r\n"), 1048576, allowTruncation = 
true))
      .map[Development](bs => { parse(bs.utf8String).extract[Development] })
  }

  val req = HttpRequest(method = HttpMethods.GET, uri = 
s"http://$endpoint:$port/developments/stream";)
  val res: Source[Development, Any] = 
Source.single(req).via(client).flatMapConcat(chunkConsumer)

  res
}

``` 

And consumed like so:

```

def slowAsyncTransformation(d: Development) = {
  Future { Thread sleep 50; d }
}

val parallelism = 2

DevelopmentsAPI
  .stream()
  .mapAsync(parallelism)(slowAsyncTransformation(_))
  .runFold(0)((acc, d) => {
    System.out.println("processing development " + d.id)
    acc + 1
  })
  .onComplete {
    case Success(s) => {
      System.out.println(s"processed $s developments")
      system.terminate()
      Await.result(system.whenTerminated, 5 seconds)
      System.exit(0)
    }
    case Failure(ex) => {
      System.err.println(s"Could not finish processing developments: $ex")
      system.terminate()
      Await.result(system.whenTerminated, 5 seconds)
      System.exit(1)
    }
  }

```


I can consistently get this to work from end to end in 180 seconds if I have 
`parallelism` set to 2. 


But if I set `parallelism` to 1, it crashes in 35 seconds with `Could not 
finish processing developments: akka.stream.StreamTcpException: The 
connection closed with error: Connection reset by peer`

I cannot make sense of why is the degree of parallelism being 1 would cause 
the peer to reset the connection. 

Any help will be greatly appreciated.

Thanks, 

Eric  

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to