I am using TCP(http://doc.akka.io/docs/akka/snapshot/java/io-tcp.html)
actor as entry point to my java application:
public SupervisorActor() {
receive(ReceiveBuilder.
match(Tcp.Connected.class, msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connecting to socket " +
getHostAndPort(msg.localAddress()));
}
getContext().become(connected(sender()));
sender().tell(TcpMessage.register(self()), self());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Configuration completed");
}
}).
match(ActorRequest.class, actorRequest -> {
ActorRef actorRef =
availableActors.get(actorRequest.getAddress());
if (actorRef != null) {
actorRef.tell(actorRequest, getContext().self());
}
}).
match(Tcp.Bound.class, msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Open socket " +
getHostAndPort(msg.localAddress()));
}
}).
match(Tcp.Received.class, msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Receiving request before connected" + new
String(msg.data().toArray()));
}
}).
matchAny(msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received unknown message " +
getResponseTest(msg));
}
})
.build());
}
After client connected to socket I changed behaviour to:
private PartialFunction<Object, BoxedUnit> connected(ActorRef
connection) {
return ReceiveBuilder.
match(Tcp.Received.class, msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Receiving request " + new
String(msg.data().toArray()));
}
ByteString byteString = msg.data();
ActorRequest actorRequest =
OBJECT_MAPPER.readValue(byteString.toArray(), ActorRequest.class);
ActorRef actorRef =
availableActors.get(actorRequest.getAddress());
if (actorRef == null) {
ActorResponse errorActorResponse =
ActorResponse.createErrorResponse("Unknown address " +
actorRequest.getAddress());
errorActorResponse.setAddress(actorRequest.getAddress());
errorActorResponse.setKey(actorRequest.getKey());
ByteString errorResponse = sendResponse(errorActorResponse);
connection.tell(TcpMessage.write(errorResponse,
TcpMessage.noAck()), self());
} else {
actorRef.tell(actorRequest, getContext().self());
}
}).
match(ActorResponse.class, response -> {
ByteString byteString = sendResponse(response);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending response " + new
String(byteString.toArray()));
}
connection.tell(TcpMessage.write(byteString, TcpMessage.noAck()),
self());
}).
match(Tcp.CommandFailed.class, response -> LOGGER.warn("Socket
failed")).
match(Tcp.ConnectionClosed.class, msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connection closed" +
"");
}
getContext().stop(self());
}).
matchAny(
msg -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Received unknown message");
LOGGER.debug("Receiving request " + getResponseTest(msg));
}
}).
build();
}
It's work fine most time, but sometimes I have got only part of client
request in TCP.Received object, so than JSON parsing is failed. And then I
received the second part of request in not connected state:
2015-06-16 13:43:49.443 [FunGo-akka.actor.default-dispatcher-3] DEBUG
by.adhoc.fungo.actor.SupervisorActor - Receiving request
{"address":"structure.import","key":"{\"socketId\":\"LSnQGhc-UtYLrbz0AAAB\",\"_idProject\":\"557ee12575a2e49c0ba6e3ef\",\"_idUser\":\"557ee10375a2e49c0ba6e3ea\",\"pointer\":\"557ffde2405a53101a8cc933\"}","body":"UEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAADAAAAYnkvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAJAAAAYnkvYWRob2MvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAPAAAAYnkvYWRob2MvZnVuZ28vUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAVAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvUEsDBAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAbAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvc2FsZXMvUEsDBBQAAAAIAFpxlkaAw61imQEAAJ4DAAAoAAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvc2FsZXMvQWRkcmVzcy5jbGFzc4WRz0rDQBDGv01rYmLUqq3W/1Y8VAWDZ6UgBU9FDxXvabqtW2IC2VTwrRQUwYMP4EOJs3GlEiNeZmYn833z2837x+sbgGPsOjCxYWMKmypsqbBtYcdCg6EciPSeYbEz8u98L/SjoddNExENTxhMmSacp1TEg4EIOBWnIhJpi6HU3L8mcTvuU3e+IyJ+Mb7t8eTK74Vc2cWBH177iVBn3SynN0Iy7HV6957fv4kDbzCOhrF3SyahJ/2QS++s30+4lLTcGvK0nbFVm/tFdJb8Hqg1f39XeDZZdPUVbPmjpv6lvpEtJ7XTjcdJwM+FonU1ypGydjED14WFaRe2Cg6mGXb/vwhDZYJ22RvxIEWD3t+kP1OCoRypMmDr7GTZAlMLKc7SyaPMKE8dvIA9ZmNzFM2saWKeovs1gAoWKNP7Y+mX+AnGQ07sFoqrqGlxi6YNNX1w+AxjstrJuhXSLGQOy19T2kFV6l5q8XIhSCkPUi0EWSkGKeVB6qRZ/QPE1iD1QpByHmSjEGS1GKScB9khTeMPEEeDrGWa9U9QSwMEFAAAAAgAWnGWRgNkOS4QAgAAIwUAACkAAABieS9hZGhvYy9mdW5nby9tb2RlbC9zYWxlcy9DdXN0b21lci5jbGFzc42S3W7TQBCFjx3HjoND27QJtFAgLQE7hRquQZVQERJSVJCCetMr210cV46N/IPEW4EEQuKCB+ChELMbtwFnI3EzY++cOfPt2L9+//gJ4CmetGHAMaFjxMMBD494eGzg0ICrQEu8GVPQHV94Hz039pLQnRRZlITPFDS8kEqbf5VeJwULWUa1Zh6k
2015-06-16 13:43:49.459 [FunGo-akka.actor.default-dispatcher-6] DEBUG
by.adhoc.fungo.actor.SupervisorActor - Receiving request before
connectedWa3xZVr6MaOi/mGaJiyn3jOZr/48SqLiiAbYzikhHKfn5LQ2jhJ2Us58lr3z/Fh4p4EXn3pZxN+rQ62YRuQ8HPufXO98mgbu+zIJU3dGJrGbezHL3eMyL9KZ4DRCVpyIK27ZjgzGyC8FPXu5zvl0sngRCoEj3YWeV4K+LalzixZZTOYL+xfjamWt/ErRs5cF3MQkk7fVYglFulozX2j69pn8Qu1JWmYBexXxfXYul3XItRbWsWGhBdNCG9csWOhYuI41Bfv/sXAF64uJb/wLFhQY0O9m0N+oQeW+9KRya5HJXWQaQNmEwsdT7NKbS1mh3Bx9h/JFyDYp6uJQxxZFay5AD33KCm7g5lLzV6ifa80dafM2dqrmI1KrXD06+AZ1MbotTjeopysc+nNV5cCfWuIOCm7htgSkUQfpSUF2cUcG0qiDbFPPzgoQvmA++C7uSUC0OsiuFGSAPRmIVgcZUM/eChD+hfngfdyXgDTrIEMpyBAPZCDNOohDPaMVIPwX44Mfih77D1BLAwQUAAAACABacZZGaSHwO4kBAABrAwAAKAAAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL1Byb2R1Y3QuY2xhc3ONUctKw0AUPdNXbI22ttXW9wORtILBteKm4KpUQXHjapqObUqaQB6Cf6WgCC78AD9KvDNGijVqN3dm7pxzzzkzb+8vrwAOsVVAFqt5ZLAmy7qGDQ2bDBmXjwRDuT3kt9x0uNs3L0LfdvtHDHkrCkJvJPyAYe+63b0zeW/gWeZN5PY9c+T1hGMG3BGB2YqBRMod264dnjCkjcYVzW8RjKHYtl3RiUZd4V/yrqMUPYs7V9y35TluZsKBTWK7f2md+14vskKS0voi7Cj7VaORFEALvgCLxs97aU+nEa1xSsNoTJtTD74xG8Z0RClauPAi3xKntoysx3kOpD8dBczqyEHTMYM8w87/D8FQGkc76w6FFWKbvjhL/55CSg5TO5pHaxZMalDV6WTSymS3+Qz2oGBzVHOqmcM8Vf0TgCJKtDIsoPyD/IjU/QR5NpFcQTUmnyhvhG7uPyE1li6obpE4JTVh6RMVT5A7mUcKL2IpwUh60kgl0UgN9SQj6UkjNeLUfzEiH1QKLyvOygdQSwECHwAKAAAAAABacZZGAAAAAAAAAAAAAAAAAwAkAAAAAAAAABAgAAAAAAAAYnkvCgAgAAAAAAABABgAMGli/+x80AFHcWL/7HzQAUdxYv/sfNABUEsBAh8ACgAAAAAAWnGWRgAAAAAAAAAAAAAAAAkAJAAAAAAAAAAQIAAAIQAAAGJ5L2FkaG9jLwoAIAAAAAAAAQAYADBpYv/sfNABR3Fi/+x80AFHcWL/7HzQAVBLAQIfAAoAAAAAAFpxlkYAAAAAAAAAAAAAAAAPACQAAAAAAAAAECAAAEgAAABieS9hZGhvYy9mdW5nby8KACAAAAAAAAEAGADQYGb/7HzQAX9oZv/sfNABR3Fi/+x80AFQSwECHwAKAAAAAABacZZGAAAAAAAAAAAAAAAAFQAkAAAAAAAAABAgAAB1AAAAYnkvYWRob2MvZnVuZ28vbW9kZWwvCgAgAAAAAAABABgAkFNj/+x80AGPW2P/7HzQAY9bY//sfNABUEsBAh8ACgAAAAAAWnGWRgAAAAAAAAAAAAAAABsAJAAAAAAAAAAQIAAAqAAAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzLwoAIAAAAAAAAQAYACB3bv/sfNAB+31u/+x80AGPW2P/7HzQAVBLAQIfABQAAAAIAFpxlkaAw61imQEAAJ4DAAAoACQAAAAAAAAAICAAAOEAAABieS9hZGhvYy9mdW5nby9tb2RlbC9zYWxlcy9BZGRyZXNzLmNsYXNzCgAgAAAAAAABABgA8D1k/+x80AHLHmT/7HzQAcseZP/sfNABUEsBAh8AFAAAAAgAWnGWRgNkOS4QAgAAIwUAACkAJAAAAAAAAAAgIAAAwAIAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL0N1c3RvbWVyLmNsYXNzCgAgAAAAAAABABgAsKFj/+x80AGPW2P/7HzQAY9bY//sfNABUEsBAh8AFAAAAAgAWnGWRmkh8DuJAQAAawMAACgAJAAAAAAAAAAgIAAAFwUAAGJ5L2FkaG9jL2Z1bmdvL21vZGVsL3NhbGVzL1Byb2R1Y3QuY2xhc3MKACAAAAAAAAEAGAAwnm7/7HzQAft9bv/sfNAB+31u/+x80AFQSwUGAAAAAAgACABUAwAA5gYAAAAA"}
2015-06-16 13:43:49.462 [FunGo-akka.actor.default-dispatcher-4] ERROR
akka.actor.OneForOneStrategy - Unexpected end-of-input in VALUE_STRING
at [Source: [B@7174d2bd; line: 1, column: 2857] (through reference chain:
by.adhoc.fungo.dto.ActorRequest["body"])com.fasterxml.jackson.databind.JsonMappingException:
Unexpected end-of-input inVALUE_STRING
at [Source: [B@7174d2bd; line: 1, column: 2857] (through reference chain:
by.adhoc.fungo.dto.ActorRequest["body"])
at
com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.wrapAndThrow(BeanPropertyMap.java:439)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:287)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:248)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3562)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2671)
~[jackson-databind-2.5.3.jar:2.5.3]
at
by.adhoc.fungo.actor.SupervisorActor.lambda$connected$11(SupervisorActor.java:102)
~[SupervisorActor.class:na]
at
by.adhoc.fungo.actor.SupervisorActor$$Lambda$18/251306795.apply(Unknown
Source) ~[na:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[akka-actor_2.11-2.3.11.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[akka-actor_2.11-2.3.11.jar:na]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.5.jar:na]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[akka-actor_2.11-2.3.11.jar:na]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[scala-library-2.11.5.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
~[akka-actor_2.11-2.3.11.jar:na]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:47)
~[akka-actor_2.11-2.3.11.jar:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
[akka-actor_2.11-2.3.11.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
[akka-actor_2.11-2.3.11.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
[akka-actor_2.11-2.3.11.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
[akka-actor_2.11-2.3.11.jar:na]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
[akka-actor_2.11-2.3.11.jar:na]
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)[scala-library-2.11.5.jar:na]
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[scala-library-2.11.5.jar:na]
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[scala-library-2.11.5.jar:na]
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[scala-library-2.11.5.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected
end-of-input in VALUE_STRING
at [Source: [B@7174d2bd; line: 1, column: 2857]
at
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:455)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:451)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:460)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2306)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2287)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:286)
~[jackson-core-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(UntypedObjectDeserializer.java:464)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
~[jackson-databind-2.5.3.jar:2.5.3]
at
com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:285)
~[jackson-databind-2.5.3.jar:2.5.3]
... 22 common frames omitted
2015-06-16 13:52:49.449 [FunGo-akka.actor.default-dispatcher-10] DEBUG
by.adhoc.fungo.actor.SupervisorActor - received unknown message PeerClosed
The idea that this error occurs sometimes and for same data that used in
successful requests. So I just write same data to socket for many times and
sometimes it fails. Also I enabled logging in application.conf:
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
debug {
# enable function of Actor.loggable(), which is to log any received
message
# at DEBUG level, see the “Testing Actor Systems” section of the
Akka
# Documentation at http://akka.io/docs
receive = true
# enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill
et.c.)
autoreceive = true
# enable DEBUG logging of actor lifecycle changes
lifecycle = true
# enable DEBUG logging of all LoggingFSMs for events, transitions
and timers
fsm = true
# enable DEBUG logging of subscription changes on the eventStream
event-stream = true
# enable DEBUG logging of unhandled messages
unhandled = true
# enable WARN logging of misconfigured routers
router-misconfiguration = true
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.