Yes, I know that it's one of best practice when implementing tcp sockets protocol. But I am using TCP Actors from Akka framework and there is no any stream. I know that could fixing it by concatenation ByteString from different TCP.Received message. But for me it looks like hack. Because it looks like it's Akka responsibility, because in most times I have got full response in TCP.Received message. It looks like some bug in Akka
you need a frame based codec. > LV,L->Length,V->Value > > 在 2015年6月16日星期二 UTC+8下午7:17:26,Dmitrii Galagaev写道: >> >> I am using TCP(http://doc.akka.io/docs/akka/snapshot/java/io-tcp.html) >> actor as entry point to my java application: >> >> public SupervisorActor() { >> receive(ReceiveBuilder. >> match(Tcp.Connected.class, msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Connecting to socket " + >> getHostAndPort(msg.localAddress())); >> } >> getContext().become(connected(sender())); >> sender().tell(TcpMessage.register(self()), self()); >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Configuration completed"); >> } >> }). >> match(ActorRequest.class, actorRequest -> { >> ActorRef actorRef = >> availableActors.get(actorRequest.getAddress()); >> if (actorRef != null) { >> actorRef.tell(actorRequest, getContext().self()); >> } >> }). >> match(Tcp.Bound.class, msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Open socket " + >> getHostAndPort(msg.localAddress())); >> } >> }). >> match(Tcp.Received.class, msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Receiving request before connected" + new >> String(msg.data().toArray())); >> } >> }). >> matchAny(msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("received unknown message " + >> getResponseTest(msg)); >> } >> }) >> .build()); >> } >> >> After client connected to socket I changed behaviour to: >> >> private PartialFunction<Object, BoxedUnit> connected(ActorRef >> connection) { >> return ReceiveBuilder. >> match(Tcp.Received.class, msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Receiving request " + new >> String(msg.data().toArray())); >> } >> ByteString byteString = msg.data(); >> ActorRequest actorRequest = >> OBJECT_MAPPER.readValue(byteString.toArray(), ActorRequest.class); >> ActorRef actorRef = >> availableActors.get(actorRequest.getAddress()); >> if (actorRef == null) { >> ActorResponse errorActorResponse = >> ActorResponse.createErrorResponse("Unknown address " + >> actorRequest.getAddress()); >> errorActorResponse.setAddress(actorRequest.getAddress()); >> errorActorResponse.setKey(actorRequest.getKey()); >> ByteString errorResponse = sendResponse(errorActorResponse); >> connection.tell(TcpMessage.write(errorResponse, >> TcpMessage.noAck()), self()); >> } else { >> actorRef.tell(actorRequest, getContext().self()); >> } >> }). >> match(ActorResponse.class, response -> { >> ByteString byteString = sendResponse(response); >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Sending response " + new >> String(byteString.toArray())); >> } >> connection.tell(TcpMessage.write(byteString, >> TcpMessage.noAck()), self()); >> }). >> match(Tcp.CommandFailed.class, response -> LOGGER.warn("Socket >> failed")). >> match(Tcp.ConnectionClosed.class, msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Connection closed" + >> ""); >> } >> getContext().stop(self()); >> }). >> matchAny( >> msg -> { >> if (LOGGER.isDebugEnabled()) { >> LOGGER.debug("Received unknown message"); >> LOGGER.debug("Receiving request " + >> getResponseTest(msg)); >> } >> }). >> build(); >> } >> >> It's work fine most time, but sometimes I have got only part of client >> request in TCP.Received object, so than JSON parsing is failed. And then I >> received the second part of request in not connected state: >> >> 2015-06-16 13:43:49.443 [FunGo-akka.actor.default-dispatcher-3] DEBUG >> by.adhoc.fungo.actor.SupervisorActor - Receiving request >> {"address":"structure.import","key":"{\"socketId\":\"LSnQGhc-UtYLrbz0AAAB\",\"_idProject\":\"557ee12575a2e49c0ba6e3ef\",\"_idUser\":\"557ee10375a2e49c0ba6e3ea\",\"pointer\":\"557ffde2405a53101a8cc933\"}","body":"UEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAADAAAAYnkvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAJAAAAYnkvYWRob2MvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAPAAAAYnkvYWRob2MvZnVuZ28vUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAVAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAbAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvc2FsZXMvUEsDBBQAAAAIAFpxlkaAw61imQEAAJ4DAAAoAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvc2FsZXMvQWRkcmVzcy5jbGFzc4WRz0rDQBDGv01rYmLUqq3W/1Y8VAWDZ6UgBU9FDxXvabqtW2IC2VTwrRQUwYMP4EOJs3GlEiNeZmYn833z2837x+sbgGPsOjCxYWMKmypsqbBtYcdCg6EciPSeYbEz8u98L/SjoddNExENTxhMmSacp1TEg4EIOBWnIhJpi6HU3L8mcTvuU3e+IyJ+Mb7t8eTK74Vc2cWBH177iVBn3SynN0Iy7HV6957fv4kDbzCOhrF3SyahJ/2QS++s30+4lLTcGvK0nbFVm/tFdJb8Hqg1f39XeDZZdPUVbPmjpv6lvpEtJ7XTjcdJwM+FonU1ypGydjED14WFaRe2Cg6mGXb/vwhDZYJ22RvxIEWD3t+kP1OCoRypMmDr7GTZAlMLKc7SyaPMKE8dvIA9ZmNzFM2saWKeovs1gAoWKNP7Y+mX+AnGQ07sFoqrqGlxi6YNNX1w+AxjstrJuhXSLGQOy19T2kFV6l5q8XIhSCkPUi0EWSkGKeVB6qRZ/QPE1iD1QpByHmSjEGS1GKScB9khTeMPEEeDrGWa9U9QSwMEFAAAAAgAWnGWRgNkOS4QAgAAIwUAACkAAABieS9hZGhvYy9mdW5nby9tb2RlbC9zYWxlcy9DdXN0b21lci5jbGFzc42S3W7TQBCFjx3HjoND27QJtFAgLQE7hRquQZVQERJSVJCCetMr210cV46N/IPEW4EEQuKCB+ChELMbtwFnI3EzY++cOfPt2L9+//gJ4CmetGHAMaFjxMMBD494eGzg0ICrQEu8GVPQHV94Hz039pLQnRRZlITPFDS8kEqbf5VeJwULWUa1Zh6k >> 2015-06-16 13:43:49.459 [FunGo-akka.actor.default-dispatcher-6] DEBUG >> by.adhoc.fungo.actor.SupervisorActor - Receiving request before >> connectedWa3xZVr6MaOi/mGaJiyn3jOZr/48SqLiiAbYzikhHKfn5LQ2jhJ2Us58lr3z/Fh4p4EXn3pZxN+rQ62YRuQ8HPufXO98mgbu+zIJU3dGJrGbezHL3eMyL9KZ4DRCVpyIK27ZjgzGyC8FPXu5zvl0sngRCoEj3YWeV4K+LalzixZZTOYL+xfjamWt/ErRs5cF3MQkk7fVYglFulozX2j69pn8Qu1JWmYBexXxfXYul3XItRbWsWGhBdNCG9csWOhYuI41Bfv/sXAF64uJb/wLFhQY0O9m0N+oQeW+9KRya5HJXWQaQNmEwsdT7NKbS1mh3Bx9h/JFyDYp6uJQxxZFay5AD33KCm7g5lLzV6ifa80dafM2dqrmI1KrXD06+AZ1MbotTjeopysc+nNV5cCfWuIOCm7htgSkUQfpSUF2cUcG0qiDbFPPzgoQvmA++C7uSUC0OsiuFGSAPRmIVgcZUM/eChD+hfngfdyXgDTrIEMpyBAPZCDNOohDPaMVIPwX44Mfih77D1BLAwQUAAAACABacZZGaSHwO4kBAABrAwAAKAAAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL1Byb2R1Y3QuY2xhc3ONUctKw0AUPdNXbI22ttXW9wORtILBteKm4KpUQXHjapqObUqaQB6Cf6WgCC78AD9KvDNGijVqN3dm7pxzzzkzb+8vrwAOsVVAFqt5ZLAmy7qGDQ2bDBmXjwRDuT3kt9x0uNs3L0LfdvtHDHkrCkJvJPyAYe+63b0zeW/gWeZN5PY9c+T1hGMG3BGB2YqBRMod264dnjCkjcYVzW8RjKHYtl3RiUZd4V/yrqMUPYs7V9y35TluZsKBTWK7f2md+14vskKS0voi7Cj7VaORFEALvgCLxs97aU+nEa1xSsNoTJtTD74xG8Z0RClauPAi3xKntoysx3kOpD8dBczqyEHTMYM8w87/D8FQGkc76w6FFWKbvjhL/55CSg5TO5pHaxZMalDV6WTSymS3+Qz2oGBzVHOqmcM8Vf0TgCJKtDIsoPyD/IjU/QR5NpFcQTUmnyhvhG7uPyE1li6obpE4JTVh6RMVT5A7mUcKL2IpwUh60kgl0UgN9SQj6UkjNeLUfzEiH1QKLyvOygdQSwECHwAKAAAAAABacZZGAAAAAAAAAAAAAAAAAwAkAAAAAAAAABAgAAAAAAAAYnkvCgAgAAAAAAABABgAMGli/+x80AFHcWL/7HzQAUdxYv/sfNABUEsBAh8ACgAAAAAAWnGWRgAAAAAAAAAAAAAAAAkAJAAAAAAAAAAQIAAAIQAAAGJ5L2FkaG9jLwoAIAAAAAAAAQAYADBpYv/sfNABR3Fi/+x80AFHcWL/7HzQAVBLAQIfAAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAPACQAAAAAAAAAECAAAEgAAABieS9hZGhvYy9mdW5nby8KACAAAAAAAAEAGADQYGb/7HzQAX9oZv/sfNABR3Fi/+x80AFQSwECHwAKAAAAAABacZZGAAAAAAAAAAAAAAAAFQAkAAAAAAAAABAgAAB1AAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvCgAgAAAAAAABABgAkFNj/+x80AGPW2P/7HzQAY9bY//sfNABUEsBAh8ACgAAAAAAWnGWRgAAAAAAAAAAAAAAABsAJAAAAAAAAAAQIAAAqAAAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzLwoAIAAAAAAAAQAYACB3bv/sfNAB+31u/+x80AGPW2P/7HzQAVBLAQIfABQAAAAIAFpxlkaAw61imQEAAJ4DAAAoACQAAAAAAAAAICAAAOEAAABieS9hZGhvYy9mdW5nby9tb2RlbC9zYWxlcy9BZGRyZXNzLmNsYXNzCgAgAAAAAAABABgA8D1k/+x80AHLHmT/7HzQAcseZP/sfNABUEsBAh8AFAAAAAgAWnGWRgNkOS4QAgAAIwUAACkAJAAAAAAAAAAgIAAAwAIAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL0N1c3RvbWVyLmNsYXNzCgAgAAAAAAABABgAsKFj/+x80AGPW2P/7HzQAY9bY//sfNABUEsBAh8AFAAAAAgAWnGWRmkh8DuJAQAAawMAACgAJAAAAAAAAAAgIAAAFwUAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL1Byb2R1Y3QuY2xhc3MKACAAAAAAAAEAGAAwnm7/7HzQAft9bv/sfNAB+31u/+x80AFQSwUGAAAAAAgACABUAwAA5gYAAAAA"} >> 2015-06-16 13:43:49.462 [FunGo-akka.actor.default-dispatcher-4] ERROR >> akka.actor.OneForOneStrategy - Unexpected end-of-input in VALUE_STRING >> at [Source: [B@7174d2bd; line: 1, column: 2857] (through reference chain: >> by.adhoc.fungo.dto.ActorRequest["body"])com.fasterxml.jackson.databind.JsonMappingException: >> >> Unexpected end-of-input inVALUE_STRING >> at [Source: [B@7174d2bd; line: 1, column: 2857] (through reference chain: >> by.adhoc.fungo.dto.ActorRequest["body"]) >> at >> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.wrapAndThrow(BeanPropertyMap.java:439) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:287) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:248) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3562) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2671) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> by.adhoc.fungo.actor.SupervisorActor.lambda$connected$11(SupervisorActor.java:102) >> >> ~[SupervisorActor.class:na] >> at >> by.adhoc.fungo.actor.SupervisorActor$$Lambda$18/251306795.apply(Unknown >> Source) ~[na:na] >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> ~[akka-actor_2.11-2.3.11.jar:na] >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> ~[akka-actor_2.11-2.3.11.jar:na] >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> ~[scala-library-2.11.5.jar:na] >> at >> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> ~[akka-actor_2.11-2.3.11.jar:na] >> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> ~[scala-library-2.11.5.jar:na] >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> ~[akka-actor_2.11-2.3.11.jar:na] >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:47) >> ~[akka-actor_2.11-2.3.11.jar:na] >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> [akka-actor_2.11-2.3.11.jar:na] >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> [akka-actor_2.11-2.3.11.jar:na] >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> [akka-actor_2.11-2.3.11.jar:na] >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> [akka-actor_2.11-2.3.11.jar:na] >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> >> [akka-actor_2.11-2.3.11.jar:na] >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)[scala-library-2.11.5.jar:na] >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> [scala-library-2.11.5.jar:na] >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> [scala-library-2.11.5.jar:na] >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> [scala-library-2.11.5.jar:na] >> Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected >> end-of-input in VALUE_STRING >> at [Source: [B@7174d2bd; line: 1, column: 2857] >> at >> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:455) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:451) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:460) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2306) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2287) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:286) >> >> ~[jackson-core-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(UntypedObjectDeserializer.java:464) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> at >> com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:285) >> >> ~[jackson-databind-2.5.3.jar:2.5.3] >> ... 22 common frames omitted >> 2015-06-16 13:52:49.449 [FunGo-akka.actor.default-dispatcher-10] DEBUG >> by.adhoc.fungo.actor.SupervisorActor - received unknown message PeerClosed >> >> >> The idea that this error occurs sometimes and for same data that used in >> successful requests. So I just write same data to socket for many times and >> sometimes it fails. Also I enabled logging in application.conf: >> >> akka { >> loggers = ["akka.event.slf4j.Slf4jLogger"] >> loglevel = "DEBUG" >> >> debug { >> # enable function of Actor.loggable(), which is to log any >> received message >> # at DEBUG level, see the “Testing Actor Systems” section of the >> Akka >> # Documentation at http://akka.io/docs >> receive = true >> >> # enable DEBUG logging of all AutoReceiveMessages (Kill, >> PoisonPill et.c.) >> autoreceive = true >> >> # enable DEBUG logging of actor lifecycle changes >> lifecycle = true >> >> # enable DEBUG logging of all LoggingFSMs for events, transitions >> and timers >> fsm = true >> >> # enable DEBUG logging of subscription changes on the eventStream >> event-stream = true >> >> # enable DEBUG logging of unhandled messages >> unhandled = true >> >> # enable WARN logging of misconfigured routers >> router-misconfiguration = true >> } >> } >> >> >> >> >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
