Hello, 

When I use akka cluster with postresql persistence journal 
(https://github.com/okumin/akka-persistence-sql-async) I get event store in 
the DB that don't belong to my persistent actor but to the cluster

Eg: 
PersistentImpl(ShardRegionRegistered(Actor[akka://application/system/sharding/ReadQuota#587905891]),1,/sharding/ReadQuotaCoordinator,,false,null,9f38b610-c51a-4e69-8de1-480f13376a3e)

On the first start it works but when my persistant actor replay event they 
fail on this kind of event. 

Do I need to persist them ? 
If not how can I exclude them ? 

If yes how can I persist them in a json format ? I use a jsonSerializer 
like this one 

package actors.es;


import actors.consumer.UpdatedQuotaEvent
import actors.event.Event
import actors.event.Event._
import play.api.Logger
import play.api.libs.json.Json


import akka.actor._
import akka.persistence.serialization.MessageSerializer
import akka.persistence.PersistentRepr
import akka.serialization.{SerializerWithStringManifest, Serializer}
import java.nio.charset.StandardCharsets
import play.api.libs.functional.syntax._
import play.api.libs.json._


class JsonSerializer(system: ExtendedActorSystem) extends 
SerializerWithStringManifest {


  implicit def jsonFormat[T]()(implicit f: Format[T]): Format[PersistentRepr
] = {
    (
      (__ \ "payload").format[T] and
      (__ \ "sequence_nr").format[Long] and
      (__ \ "persistence_id").format[String] and
      (__ \ "manifest").format[String] and
      (__ \ "deleted").format[Boolean] and
      (__ \ "writer_uuid").format[String]
    )(PersistentRepr(_, _, _, _, _, null, _), x => (
        x.payload.asInstanceOf[T],
        x.sequenceNr,
        x.persistenceId,
        x.manifest,
        x.deleted,
        x.writerUuid
      ))
  }


  private[this] val serializer: Serializer = new MessageSerializer(system)


  val UpdatedQuotaEventManifest = "UpdatedQuotaEvent"
  val BasicEventManifest = "BasicEvent"
  val PersistentReprManifest = "akka.persistence.PersistentRepr"
  val UTF_8 = StandardCharsets.UTF_8.name()


  // Pick a unique identifier for your Serializer,
  // you've got a couple of billions to choose from,
  // 0 - 16 is reserved by Akka itself
  def identifier = 523


  // The manifest (type hint) that will be provided in the fromBinary method
  // Use `""` if manifest is not needed.
  def manifest(obj: AnyRef): String =
    obj match {
      case p @ PersistentRepr(_: UpdatedQuotaEvent, _) => 
UpdatedQuotaEventManifest
      case p @ PersistentRepr(_: Event, _)             => BasicEventManifest
    }


  // "toBinary" serializes the given object to an Array of Bytes
  def toBinary(obj: AnyRef): Array[Byte] = {
    // Put the real code that serializes the object here
    obj match {
      case p @ PersistentRepr(_: UpdatedQuotaEvent, _) =>
        Json.toJson(p.withManifest(UpdatedQuotaEventManifest))(jsonFormat()(
UpdatedQuotaEvent.updateQuotaEventFormat)).toString().getBytes(
StandardCharsets.UTF_8)
      case p @ PersistentRepr(_: Event, _) =>
        Json.toJson(p.withManifest(BasicEventManifest))(jsonFormat()(Event.
event_format)).toString().getBytes(StandardCharsets.UTF_8)
      case x => serializeUnknownData(obj)
    }
  }


  private def serializeUnknownData(data: AnyRef): Array[Byte] = {
    Logger.warn(s"unknown dats to serialize \n $data")
    serializer.toBinary(data)
  }


  // "fromBinary" deserializes the given array,
  // using the type hint
  def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    // Put the real code that deserializes here
    manifest match {
      case UpdatedQuotaEventManifest =>
        Json.fromJson[PersistentRepr](Json.parse(bytes))(jsonFormat()(
UpdatedQuotaEvent.updateQuotaEventFormat)).get
      case BasicEventManifest =>
        Json.fromJson[PersistentRepr](Json.parse(bytes))(jsonFormat()(Event.
event_format)).get
      case PersistentReprManifest => {
        val json = Json.parse(bytes)
        val inner_manifest = (json \ "manifest").as[String]
        fromBinary(bytes, inner_manifest)
      }
      case x => serializer.fromBinary(bytes)
    }
  }


}


the config


akka {
  persistence {
    journal.plugin = "akka-persistence-sql-async.journal"
    snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store"
    view.auto-update-interval = 1s
  }
}


akka {
  actor {
    serializers {
      json = "actors.es.JsonSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }


    serialization-bindings {
      "akka.persistence.PersistentRepr" = json
    }
  }
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to