Hello,
When I use akka cluster with postresql persistence journal
(https://github.com/okumin/akka-persistence-sql-async) I get event store in
the DB that don't belong to my persistent actor but to the cluster
Eg:
PersistentImpl(ShardRegionRegistered(Actor[akka://application/system/sharding/ReadQuota#587905891]),1,/sharding/ReadQuotaCoordinator,,false,null,9f38b610-c51a-4e69-8de1-480f13376a3e)
On the first start it works but when my persistant actor replay event they
fail on this kind of event.
Do I need to persist them ?
If not how can I exclude them ?
If yes how can I persist them in a json format ? I use a jsonSerializer
like this one
package actors.es;
import actors.consumer.UpdatedQuotaEvent
import actors.event.Event
import actors.event.Event._
import play.api.Logger
import play.api.libs.json.Json
import akka.actor._
import akka.persistence.serialization.MessageSerializer
import akka.persistence.PersistentRepr
import akka.serialization.{SerializerWithStringManifest, Serializer}
import java.nio.charset.StandardCharsets
import play.api.libs.functional.syntax._
import play.api.libs.json._
class JsonSerializer(system: ExtendedActorSystem) extends
SerializerWithStringManifest {
implicit def jsonFormat[T]()(implicit f: Format[T]): Format[PersistentRepr
] = {
(
(__ \ "payload").format[T] and
(__ \ "sequence_nr").format[Long] and
(__ \ "persistence_id").format[String] and
(__ \ "manifest").format[String] and
(__ \ "deleted").format[Boolean] and
(__ \ "writer_uuid").format[String]
)(PersistentRepr(_, _, _, _, _, null, _), x => (
x.payload.asInstanceOf[T],
x.sequenceNr,
x.persistenceId,
x.manifest,
x.deleted,
x.writerUuid
))
}
private[this] val serializer: Serializer = new MessageSerializer(system)
val UpdatedQuotaEventManifest = "UpdatedQuotaEvent"
val BasicEventManifest = "BasicEvent"
val PersistentReprManifest = "akka.persistence.PersistentRepr"
val UTF_8 = StandardCharsets.UTF_8.name()
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 16 is reserved by Akka itself
def identifier = 523
// The manifest (type hint) that will be provided in the fromBinary method
// Use `""` if manifest is not needed.
def manifest(obj: AnyRef): String =
obj match {
case p @ PersistentRepr(_: UpdatedQuotaEvent, _) =>
UpdatedQuotaEventManifest
case p @ PersistentRepr(_: Event, _) => BasicEventManifest
}
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the real code that serializes the object here
obj match {
case p @ PersistentRepr(_: UpdatedQuotaEvent, _) =>
Json.toJson(p.withManifest(UpdatedQuotaEventManifest))(jsonFormat()(
UpdatedQuotaEvent.updateQuotaEventFormat)).toString().getBytes(
StandardCharsets.UTF_8)
case p @ PersistentRepr(_: Event, _) =>
Json.toJson(p.withManifest(BasicEventManifest))(jsonFormat()(Event.
event_format)).toString().getBytes(StandardCharsets.UTF_8)
case x => serializeUnknownData(obj)
}
}
private def serializeUnknownData(data: AnyRef): Array[Byte] = {
Logger.warn(s"unknown dats to serialize \n $data")
serializer.toBinary(data)
}
// "fromBinary" deserializes the given array,
// using the type hint
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
// Put the real code that deserializes here
manifest match {
case UpdatedQuotaEventManifest =>
Json.fromJson[PersistentRepr](Json.parse(bytes))(jsonFormat()(
UpdatedQuotaEvent.updateQuotaEventFormat)).get
case BasicEventManifest =>
Json.fromJson[PersistentRepr](Json.parse(bytes))(jsonFormat()(Event.
event_format)).get
case PersistentReprManifest => {
val json = Json.parse(bytes)
val inner_manifest = (json \ "manifest").as[String]
fromBinary(bytes, inner_manifest)
}
case x => serializer.fromBinary(bytes)
}
}
}
the config
akka {
persistence {
journal.plugin = "akka-persistence-sql-async.journal"
snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store"
view.auto-update-interval = 1s
}
}
akka {
actor {
serializers {
json = "actors.es.JsonSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"akka.persistence.PersistentRepr" = json
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.