On Wed, 2015-08-05 at 11:04 -0700, Arne Claassen wrote:
> I'm trying to figure out what the best strategy for making changes to
> the message protocols in my cluster that will let me do upgrades while
> the cluster is running. I'm trying to get to a point where I have some
> parity in upgrades as I had when my services talked over HTTP w/JSON.
> I.e. as long as i was backwards compatible in my message changes, I
> could swap out service nodes one at a time.
> 
> 
> Using akka-cluster as my service interconnect infrastructure, this is
> of course trickier. The default Java serializer pretty much dies on
> any changes, so the question is whether I can fix this with a custom
> serializer or I have to be strict and make every protocol change a new
> message type and keep the old and new around until all nodes are on
> the same new version.
> 
> 
> I figured using protobuf might do that trick, since the format itself
> offers this type of backwards compatibility. However, I can neither
> find a respository to resolve akka-protobuf-serialization from nor can
> I get it to build locally. And before I invest a lot of time trying to
> make it work, I wanted to see if anyone can confirm that its
> interaction with akka even lets me get the type of backwards
> compatibility I am hoping for.

I'm using protobuf for this, with the sbt-protobuf plugin to generate
the Java code.

Protobuf allows you to have a compatible wire format, but won't prevent
semantic incompatibilities. Those will need to be handled in your
receiving or sending code.

What I do usually is I have one protobuf message per scala case class I
want to send, and a specific serializer for each class of message. I
usually embed the message type in the protobuf schema to allow easy
unserialization (this is an old habit from before akka). My serializer
takes those case classes, transforms them in Protobuf java, then to
bytes (and the reverse).
This is a bit inconvenient (and is tedious), but at least it allows to
keep scala in the actors (for the matching), and still have the power of
protobuf.

Here's a short example:

```scala
  sealed class BaseMessage extends Serializable
  case class MyMessage(field1: String, field2: Option[Int]) extends
BaseMessage
```

```protobuf
...
message BaseMessage {
  required int32 request_id = 1;
  extensions 100 to max;
}

message MyMessage {
  extend BaseMessage {
    optional MyMessage my_message = 100;
  }

  optional string field1 = 1;
  optional int32 field2 = 2;
}
```

Then the serializer itself (code is not complete):
```scala
class BaseMessageSerializer(val system: ExtendedActorSystem) extends Serializer 
{

  val registry = {
    val r = ExtensionRegistry.newInstance()
    PBBaseMessages.registerAllExtensions(r)
    r
  }

  def identifier: Int = 0x1234567
  def includeManifest: Boolean = false

  def toBinary(o: AnyRef): Array[Byte] = o match {
    case s: MyMessage => myMessageBuilder(s).toByteArray
  }

  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): BaseMessage = 
{
    val pb = PBBaseMessages.BaseMessage.parseFrom(bytes, registry)
    extractMessage(pb) match {
      case None    => throw new IllegalArgumentException("invalid base message: 
" + pb)
      case Some(s) => s
    }
  }

  private def extractMessage(request: PBBaseMessages.BaseMessage): 
Option[BaseMessage] = {
    messageToScala(requestToMessage(request))
  }

  private def encapsulate[T <: com.google.protobuf.Message](ext: 
GeneratedExtension[PBBaseMessages.BaseMessage, T], pb: T): 
PBBaseMessages.BaseMessage = {
    PBBaseMessages.BaseMessage.newBuilder.setExtension(ext, 
pb).setRequestId(ext.getDescriptor.getNumber).build
  }

  private def messageToScala(msg: Option[Message]): Option[BaseMessage] = {
    msg map {
      case x: PBBaseMessages.BaseMessage.MyMessage => myMessage(x)
    }
  }

  // Scala -> Protobuf
  private def myMessageBuilder(s: MyMessage): PBBaseMessages.BaseMessage = {
    val pb = PBBaseMessages.MyMessage.newBuilder.setField1(s.field1)
    s.field2 foreach (d => pb.setField2(d.toPB))
    encapsulate(PBBaseMessages.MyMessage.myMessage, pb.build)
  }

  // Protobuf -> scala
  private def myMessage(x: PBBaseMessages.MyMessage): MyMessage = {
    MyMessage(x.getField1(), optionalField[Int](x, "field2"))
  }
  ...

```

And finally we declare the serializer in one of the conf file:
```
  actor {
    serializers {
      base-message = "package.to.BaseMessageSerializer"
    }
    serialization-bindings {
      "package.to.BaseMessage" = base-message
    }
  }   

```

I don't know if there's a better way, but it proved to work fine in our
systems.
One important thing, your base class for your messages must inherit from
Serializable, otherwise the java-serializer will take precedence over
this custom serializer.

> How are people generally dealing with these types of protocol
> evolutions in akka clusters?

We're trying to be very careful with changes in the semantics of a
message. New fields are not a problem usually, and we are very cautious
when we want to remove a field (we mark it deprecated instead of
removing it). Be careful with 'required' fields, because if at one time
you want to remove one, you might run into big troubles (hence we don't
have any required fields).

I hope this will help you,
-- 
Brice Figureau <[email protected]>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to