TCP is fundamentally a stream protocol, if you need to transmit messages that are larger than one byte you will definitely have to implement a framing protocol on top—that is not best practice but strictly required. The fact that you get multiple bytes at once in your Actors is due to the O/S kernel giving us chunks of bytes as they are read from the socket, but how those chunks are divided is arbitrary and can be influenced by any network component on the way. There is no bug in Akka here.
Regards, Roland > 18 jun 2015 kl. 09:44 skrev Dmitrii Galagaev <[email protected]>: > > Yes, I know that it's one of best practice when implementing tcp sockets > protocol. But I am using TCP Actors from Akka framework and there is no any > stream. I know that could fixing it by concatenation ByteString from > different TCP.Received message. But for me it looks like hack. Because it > looks like it's Akka responsibility, because in most times I have got full > response in TCP.Received message. It looks like some bug in Akka > > you need a frame based codec. > LV,L->Length,V->Value > > 在 2015年6月16日星期二 UTC+8下午7:17:26,Dmitrii Galagaev写道: > I am using TCP(http://doc.akka.io/docs/akka/snapshot/java/io-tcp.html > <http://doc.akka.io/docs/akka/snapshot/java/io-tcp.html>) actor as entry > point to my java application: > > public SupervisorActor() { > receive(ReceiveBuilder. > match(Tcp.Connected.class, msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Connecting to socket " + > getHostAndPort(msg.localAddress())); > } > getContext().become(connected(sender())); > sender().tell(TcpMessage.register(self()), self()); > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Configuration completed"); > } > }). > match(ActorRequest.class, actorRequest -> { > ActorRef actorRef = availableActors.get(actorRequest.getAddress()); > if (actorRef != null) { > actorRef.tell(actorRequest, getContext().self()); > } > }). > match(Tcp.Bound.class, msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Open socket " + getHostAndPort(msg.localAddress())); > } > }). > match(Tcp.Received.class, msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Receiving request before connected" + new > String(msg.data().toArray())); > } > }). > matchAny(msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("received unknown message " + getResponseTest(msg)); > } > }) > .build()); > } > > After client connected to socket I changed behaviour to: > > private PartialFunction<Object, BoxedUnit> connected(ActorRef connection) > { > return ReceiveBuilder. > match(Tcp.Received.class, msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Receiving request " + new > String(msg.data().toArray())); > } > ByteString byteString = msg.data(); > ActorRequest actorRequest = > OBJECT_MAPPER.readValue(byteString.toArray(), ActorRequest.class); > ActorRef actorRef = availableActors.get(actorRequest.getAddress()); > if (actorRef == null) { > ActorResponse errorActorResponse = > ActorResponse.createErrorResponse("Unknown address " + > actorRequest.getAddress()); > errorActorResponse.setAddress(actorRequest.getAddress()); > errorActorResponse.setKey(actorRequest.getKey()); > ByteString errorResponse = sendResponse(errorActorResponse); > connection.tell(TcpMessage.write(errorResponse, > TcpMessage.noAck()), self()); > } else { > actorRef.tell(actorRequest, getContext().self()); > } > }). > match(ActorResponse.class, response -> { > ByteString byteString = sendResponse(response); > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Sending response " + new > String(byteString.toArray())); > } > connection.tell(TcpMessage.write(byteString, TcpMessage.noAck()), > self()); > }). > match(Tcp.CommandFailed.class, response -> LOGGER.warn("Socket > failed")). > match(Tcp.ConnectionClosed.class, msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Connection closed" + > ""); > } > getContext().stop(self()); > }). > matchAny( > msg -> { > if (LOGGER.isDebugEnabled()) { > LOGGER.debug("Received unknown message"); > LOGGER.debug("Receiving request " + getResponseTest(msg)); > } > }). > build(); > } > > It's work fine most time, but sometimes I have got only part of client > request in TCP.Received object, so than JSON parsing is failed. And then I > received the second part of request in not connected state: > > 2015-06-16 13:43:49.443 [FunGo-akka.actor.default-dispatcher-3] DEBUG > by.adhoc.fungo.actor.SupervisorActor - Receiving request > {"address":"structure.import","key":"{\"socketId\":\"LSnQGhc-UtYLrbz0AAAB\",\"_idProject\":\"557ee12575a2e49c0ba6e3ef\",\"_idUser\":\"557ee10375a2e49c0ba6e3ea\",\"pointer\":\"557ffde2405a53101a8cc933\"}","body":"UEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAADAAAAYnkvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAJAAAAYnkvYWRob2MvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAPAAAAYnkvYWRob2MvZnVuZ28vUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAVAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAbAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvc2FsZXMvUEsDBBQAAAAIAFpxlkaAw61imQEAAJ4DAAAoAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvc2FsZXMvQWRkcmVzcy5jbGFzc4WRz0rDQBDGv01rYmLUqq3W/1Y8VAWDZ6UgBU9FDxXvabqtW2IC2VTwrRQUwYMP4EOJs3GlEiNeZmYn833z2837x+sbgGPsOjCxYWMKmypsqbBtYcdCg6EciPSeYbEz8u98L/SjoddNExENTxhMmSacp1TEg4EIOBWnIhJpi6HU3L8mcTvuU3e+IyJ+Mb7t8eTK74Vc2cWBH177iVBn3SynN0Iy7HV6957fv4kDbzCOhrF3SyahJ/2QS++s30+4lLTcGvK0nbFVm/tFdJb8Hqg1f39XeDZZdPUVbPmjpv6lvpEtJ7XTjcdJwM+FonU1ypGydjED14WFaRe2Cg6mGXb/vwhDZYJ22RvxIEWD3t+kP1OCoRypMmDr7GTZAlMLKc7SyaPMKE8dvIA9ZmNzFM2saWKeovs1gAoWKNP7Y+mX+AnGQ07sFoqrqGlxi6YNNX1w+AxjstrJuhXSLGQOy19T2kFV6l5q8XIhSCkPUi0EWSkGKeVB6qRZ/QPE1iD1QpByHmSjEGS1GKScB9khTeMPEEeDrGWa9U9QSwMEFAAAAAgAWnGWRgNkOS4QAgAAIwUAACkAAABieS9hZGhvYy9mdW5nby9tb2RlbC9zYWxlcy9DdXN0b21lci5jbGFzc42S3W7TQBCFjx3HjoND27QJtFAgLQE7hRquQZVQERJSVJCCetMr210cV46N/IPEW4EEQuKCB+ChELMbtwFnI3EzY++cOfPt2L9+//gJ4CmetGHAMaFjxMMBD494eGzg0ICrQEu8GVPQHV94Hz039pLQnRRZlITPFDS8kEqbf5VeJwULWUa1Zh6k > 2015-06-16 13:43:49.459 [FunGo-akka.actor.default-dispatcher-6] DEBUG > by.adhoc.fungo.actor.SupervisorActor - Receiving request before > connectedWa3xZVr6MaOi/mGaJiyn3jOZr/48SqLiiAbYzikhHKfn5LQ2jhJ2Us58lr3z/Fh4p4EXn3pZxN+rQ62YRuQ8HPufXO98mgbu+zIJU3dGJrGbezHL3eMyL9KZ4DRCVpyIK27ZjgzGyC8FPXu5zvl0sngRCoEj3YWeV4K+LalzixZZTOYL+xfjamWt/ErRs5cF3MQkk7fVYglFulozX2j69pn8Qu1JWmYBexXxfXYul3XItRbWsWGhBdNCG9csWOhYuI41Bfv/sXAF64uJb/wLFhQY0O9m0N+oQeW+9KRya5HJXWQaQNmEwsdT7NKbS1mh3Bx9h/JFyDYp6uJQxxZFay5AD33KCm7g5lLzV6ifa80dafM2dqrmI1KrXD06+AZ1MbotTjeopysc+nNV5cCfWuIOCm7htgSkUQfpSUF2cUcG0qiDbFPPzgoQvmA++C7uSUC0OsiuFGSAPRmIVgcZUM/eChD+hfngfdyXgDTrIEMpyBAPZCDNOohDPaMVIPwX44Mfih77D1BLAwQUAAAACABacZZGaSHwO4kBAABrAwAAKAAAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL1Byb2R1Y3QuY2xhc3ONUctKw0AUPdNXbI22ttXW9wORtILBteKm4KpUQXHjapqObUqaQB6Cf6WgCC78AD9KvDNGijVqN3dm7pxzzzkzb+8vrwAOsVVAFqt5ZLAmy7qGDQ2bDBmXjwRDuT3kt9x0uNs3L0LfdvtHDHkrCkJvJPyAYe+63b0zeW/gWeZN5PY9c+T1hGMG3BGB2YqBRMod264dnjCkjcYVzW8RjKHYtl3RiUZd4V/yrqMUPYs7V9y35TluZsKBTWK7f2md+14vskKS0voi7Cj7VaORFEALvgCLxs97aU+nEa1xSsNoTJtTD74xG8Z0RClauPAi3xKntoysx3kOpD8dBczqyEHTMYM8w87/D8FQGkc76w6FFWKbvjhL/55CSg5TO5pHaxZMalDV6WTSymS3+Qz2oGBzVHOqmcM8Vf0TgCJKtDIsoPyD/IjU/QR5NpFcQTUmnyhvhG7uPyE1li6obpE4JTVh6RMVT5A7mUcKL2IpwUh60kgl0UgN9SQj6UkjNeLUfzEiH1QKLyvOygdQSwECHwAKAAAAAABacZZGAAAAAAAAAAAAAAAAAwAkAAAAAAAAABAgAAAAAAAAYnkvCgAgAAAAAAABABgAMGli/+x80AFHcWL/7HzQAUdxYv/sfNABUEsBAh8ACgAAAAAAWnGWRgAAAAAAAAAAAAAAAAkAJAAAAAAAAAAQIAAAIQAAAGJ5L2FkaG9jLwoAIAAAAAAAAQAYADBpYv/sfNABR3Fi/+x80AFHcWL/7HzQAVBLAQIfAAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAPACQAAAAAAAAAECAAAEgAAABieS9hZGhvYy9mdW5nby8KACAAAAAAAAEAGADQYGb/7HzQAX9oZv/sfNABR3Fi/+x80AFQSwECHwAKAAAAAABacZZGAAAAAAAAAAAAAAAAFQAkAAAAAAAAABAgAAB1AAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvCgAgAAAAAAABABgAkFNj/+x80AGPW2P/7HzQAY9bY//sfNABUEsBAh8ACgAAAAAAWnGWRgAAAAAAAAAAAAAAABsAJAAAAAAAAAAQIAAAqAAAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzLwoAIAAAAAAAAQAYACB3bv/sfNAB+31u/+x80AGPW2P/7HzQAVBLAQIfABQAAAAIAFpxlkaAw61imQEAAJ4DAAAoACQAAAAAAAAAICAAAOEAAABieS9hZGhvYy9mdW5nby9tb2RlbC9zYWxlcy9BZGRyZXNzLmNsYXNzCgAgAAAAAAABABgA8D1k/+x80AHLHmT/7HzQAcseZP/sfNABUEsBAh8AFAAAAAgAWnGWRgNkOS4QAgAAIwUAACkAJAAAAAAAAAAgIAAAwAIAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL0N1c3RvbWVyLmNsYXNzCgAgAAAAAAABABgAsKFj/+x80AGPW2P/7HzQAY9bY//sfNABUEsBAh8AFAAAAAgAWnGWRmkh8DuJAQAAawMAACgAJAAAAAAAAAAgIAAAFwUAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL1Byb2R1Y3QuY2xhc3MKACAAAAAAAAEAGAAwnm7/7HzQAft9bv/sfNAB+31u/+x80AFQSwUGAAAAAAgACABUAwAA5gYAAAAA"} > 2015-06-16 13:43:49.462 [FunGo-akka.actor.default-dispatcher-4] ERROR > akka.actor.OneForOneStrategy - Unexpected end-of-input in VALUE_STRING > at [Source: [B@7174d2bd; line: 1, column: 2857] (through reference > chain: > by.adhoc.fungo.dto.ActorRequest["body"])com.fasterxml.jackson.databind.JsonMappingException: > Unexpected end-of-input inVALUE_STRING > at [Source: [B@7174d2bd; line: 1, column: 2857] (through reference > chain: by.adhoc.fungo.dto.ActorRequest["body"]) > at > com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.wrapAndThrow(BeanPropertyMap.java:439) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:287) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:248) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3562) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2671) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > by.adhoc.fungo.actor.SupervisorActor.lambda$connected$11(SupervisorActor.java:102) > ~[SupervisorActor.class:na] > at > by.adhoc.fungo.actor.SupervisorActor$$Lambda$18/251306795.apply(Unknown > Source) ~[na:na] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[akka-actor_2.11-2.3.11.jar:na] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[akka-actor_2.11-2.3.11.jar:na] > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[scala-library-2.11.5.jar:na] > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[akka-actor_2.11-2.3.11.jar:na] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[scala-library-2.11.5.jar:na] > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > ~[akka-actor_2.11-2.3.11.jar:na] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:47) > ~[akka-actor_2.11-2.3.11.jar:na] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > [akka-actor_2.11-2.3.11.jar:na] > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > [akka-actor_2.11-2.3.11.jar:na] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > [akka-actor_2.11-2.3.11.jar:na] > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > [akka-actor_2.11-2.3.11.jar:na] > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > [akka-actor_2.11-2.3.11.jar:na] > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)[scala-library-2.11.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [scala-library-2.11.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [scala-library-2.11.5.jar:na] > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [scala-library-2.11.5.jar:na] > Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected > end-of-input in VALUE_STRING > at [Source: [B@7174d2bd; line: 1, column: 2857] > at > com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:455) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:451) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:460) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2306) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2287) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:286) > ~[jackson-core-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(UntypedObjectDeserializer.java:464) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95) > ~[jackson-databind-2.5.3.jar:2.5.3] > at > com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:285) > ~[jackson-databind-2.5.3.jar:2.5.3] > ... 22 common frames omitted > 2015-06-16 13:52:49.449 [FunGo-akka.actor.default-dispatcher-10] DEBUG > by.adhoc.fungo.actor.SupervisorActor - received unknown message PeerClosed > > > The idea that this error occurs sometimes and for same data that used in > successful requests. So I just write same data to socket for many times and > sometimes it fails. Also I enabled logging in application.conf: > > akka { > loggers = ["akka.event.slf4j.Slf4jLogger"] > loglevel = "DEBUG" > > debug { > # enable function of Actor.loggable(), which is to log any received > message > # at DEBUG level, see the “Testing Actor Systems” section of the Akka > # Documentation at http://akka.io/docs <http://akka.io/docs> > receive = true > > # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill > et.c.) > autoreceive = true > > # enable DEBUG logging of actor lifecycle changes > lifecycle = true > > # enable DEBUG logging of all LoggingFSMs for events, transitions and > timers > fsm = true > > # enable DEBUG logging of subscription changes on the eventStream > event-stream = true > > # enable DEBUG logging of unhandled messages > unhandled = true > > # enable WARN logging of misconfigured routers > router-misconfiguration = true > } > } > > > > > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at http://groups.google.com/group/akka-user > <http://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. Dr. Roland Kuhn Akka Tech Lead Typesafe <http://typesafe.com/> – Reactive apps on the JVM. twitter: @rolandkuhn <http://twitter.com/#!/rolandkuhn> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
