My rule of thumb is that you should never, *ever *use

*idstrategy = "incremental"*

What that is saying is that, at serialization time, if it encounters a type
that isn't registered yet, it *makes up a registration out of thin air.*
This is essentially guaranteed to fail unless the order of serialization
and deserialization is absolutely deterministic.  (Really, I don't think
that idstrategy should even exist, since it is nothing but a trap for the
unwary.)

That's likely why it's failing: you are trying to serialize some type
(probably in a subfield) that isn't registered, so it's just making up a
serialization ID.  It gets to the other end, which has never heard of that
ID, and crashes.

Choose a different idstrategy.  For Akka Persistence I consider "explicit"
to be the only sane option, but that's a lot of work; for other
circumstances, "automatic" should usually work decently well...

On Fri, May 12, 2017 at 8:46 AM, Kunal Ghosh <[email protected]> wrote:

> Hi,
> How do i set generics for ObjectArraySerializer in kryos ?
>
> public class ICEUniqueSource{
>
> private final *ICEColSource[] _columns*;
>
> }
>
>
> public class ICEColSource{
>
> }
>
> Following error --
> 00:11 TRACE: [kryo] Write field: _columns 
> (org.iceengine.compare.engine.ICEUniqueSource)
> pos=788
> 00:11 TRACE: [kryo] Write class 910779913: org.iceengine.compare.engine.
> ICEColSource[]
> 00:11 TRACE: [kryo] setting generics for ObjectArraySerializer
> 00:11 TRACE: [kryo] Write initial object reference 91:
> org.iceengine.compare.engine.ICEColSource[]
> 00:11 DEBUG: [kryo] Write: org.iceengine.compare.engine.ICEColSource[]
>
> WARN [17:57:38.455]{iCEDQApp-akka.actor.default-dispatcher-11}(
> Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Failed
> to deserialize message with serializer id [6] and manifest []. Encountered
> unregistered class ID: 910779913
> Serialization trace:
> _columns (org.iceengine.compare.engine.ICEUniqueSource)
> _model (org.iceengine.compare.sources.ICESourceFlatFile)
> _srcInput (org.iceengine.compare.conf.DefaultDataComparison)
> _dataComparison (org.iceengine.compare.engine.ICEEngineContext)
> _context (org.iceengine.compare.akka.RowData)
>
> *application.conf **( Only **# **KryoSerializer Configuration**)*
>
> *akka {*
>
> *actor {*
> *    kryo {*
> *      type = "graph"*
> *      idstrategy = "incremental"*
> *      buffer-size = 4096*
> *      max-buffer-size = -1*
> *      use-manifest = false*
> *      implicit-registration-logging = true*
> *      kryo-trace = true*
>
> *      mappings {*
>         *"org.iceengine.compare.engine.ICEUniqueSource" = 72*
> *        "org.iceengine.compare.engine.ICEColSource" = 63*
> *      }*
> *    }*
>
> *    serialize-messages = on*
> *    serializers {*
> *      #java = "akka.serialization.JavaSerializer"*
> *      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"*
> *    }*
> *    serialization-bindings {*
>       *"org.iceengine.compare.engine.ICEUniqueSource" = **kryo *
> *      "org.iceengine.compare.engine.ICEColSource" = **kryo *
> *    }*
> *  }*
>
> *  # Disable legacy metrics in akka-cluster.*
> *cluster.metrics.enabled=off*
>
>
> *# Enable metrics extension in akka-cluster-metrics.*
> *extensions=[*
> *   "akka.cluster.metrics.ClusterMetricsExtension",*
> *   "com.romix.akka.serialization.kryo.KryoSerializationExtension$"*
> *   ]*
>
>
> *}*
>
> On Monday, May 8, 2017 at 6:37:17 PM UTC+5:30, Patrik Nordwall wrote:
>>
>> the port number for the seed-nodes does not match canonical.port = 25520
>>
>> replacing akka.tcp with akka is correct, and if you have that in the code
>> somewhere it must be changed there also
>>
>> On Mon, May 8, 2017 at 2:36 PM, Kunal Ghosh <[email protected]> wrote:
>>
>>> Thanks @Patrik Your help is much appreciated !!
>>>
>>> Below are my configuration for Kryo Serialization and Artery remote
>>> implementation in application.conf file. Please go through it and tell me
>>> whether is it correct ??
>>> Also I have a question that changing configuration is enough or I will
>>> have to make changes in the code as well?
>>>
>>> *application.conf *
>>>
>>> *akka {*
>>>
>>> *  loggers = ["akka.event.slf4j.Slf4jLogger"]*
>>>  *  loglevel = "DEBUG"*
>>>
>>> *  stdout-loglevel = "DEBUG"*
>>>
>>> *  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"*
>>>
>>> *  actor.provider = "akka.cluster.ClusterActorRefProvider"*
>>>
>>> *# Artery remoting implementation*
>>>
>>> *remote {*
>>> *    log-remote-lifecycle-events = on*
>>> *    log-sent-messages = on*
>>> *    log-received-messages = on*
>>> *    artery {*
>>> *      enabled = on*
>>> *      canonical.hostname = "192.168.100.199"*
>>> *      canonical.port = 25520*
>>> *    }*
>>> *  }*
>>>
>>> *# **KryoSerializer Configuration*
>>>
>>> *  actor {*
>>> *    kryo {*
>>> *      type = "graph"*
>>> *      idstrategy = "incremental"*
>>> *      buffer-size = 4096*
>>> *      max-buffer-size = -1*
>>> *      use-manifest = false*
>>> *      implicit-registration-logging = true*
>>> *      kryo-trace = true*
>>>
>>> *      mappings {*
>>> *        "org.iceengine.compare.engine.ICEEngineContext" = 32*
>>> *      "org.iceengine.compare.akka.Ro
>>> <http://org.iceengine.compare.akka.Ro>wData" = 33*
>>> *      "org.iceengine.compare.akka.DataConsumerInspector" = 34*
>>> *      "org.iceengine.compare.akka.Re
>>> <http://org.iceengine.compare.akka.Re>sult" = 35*
>>> *      }*
>>> *    }*
>>>
>>> *    serialize-messages = on*
>>> *    serializers {*
>>> *      #java = "akka.serialization.JavaSerializer"*
>>> *      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"*
>>> *    }*
>>> *    serialization-bindings {*
>>> *      "org.iceengine.compare.engine.ICEEngineContext" = kryo*
>>> *      "org.iceengine.compare.akka.Ro
>>> <http://org.iceengine.compare.akka.Ro>wData" = kryo*
>>> *      "org.iceengine.compare.akka.DataConsumerInspector" = kryo*
>>> *      "org.iceengine.compare.akka.Re
>>> <http://org.iceengine.compare.akka.Re>sult" = kryo*
>>> *    }*
>>> *  }*
>>>
>>> *  cluster {*
>>> *    seed-nodes = [*
>>> *      "akka://[email protected]:2551
>>> <http://[email protected]:2551>",*
>>> *      "akka://[email protected]:2552
>>> <http://[email protected]:2552>"]*
>>>
>>> *    #auto-down-unreachable-after = 10s*
>>> *  }*
>>>
>>> *akka.cluster.min-nr-of-members =3*
>>> *# //#min-nr-of-members*
>>>
>>> *# //#role-min-nr-of-members*
>>> *akka.cluster.role {*
>>> *  frontend.min-nr-of-members = 1*
>>> *  backend.min-nr-of-members = 2*
>>> *}*
>>> *actor.allow-java-serialization = off*
>>> *actor.deployment {*
>>>
>>> *  "/*/*" {*
>>>
>>> *    # Router type provided by metrics extension. *
>>> *    #router = cluster-metrics-adaptive-group*
>>> *    router = round-robin-group*
>>> *    # Router parameter specific for metrics extension.*
>>> *    # metrics-selector = heap*
>>> *    # metrics-selector = load*
>>> *    # metrics-selector = cpu*
>>> *    metrics-selector = mix*
>>> *    #*
>>> *    routees.paths = ["/user/expEvaluationBackend"]*
>>> *    nr-of-instances = 100*
>>> *    cluster {*
>>> *      enabled = on*
>>> *      use-role = backend*
>>> *      max-nr-of-instances-per-node = 3*
>>> *      allow-local-routees = off*
>>> *    }*
>>> *  }*
>>>
>>> *}*
>>> *  # Disable legacy metrics in akka-cluster.*
>>> *cluster.metrics.enabled=off*
>>>
>>> *# Enable metrics extension in akka-cluster-metrics.*
>>> *extensions=[*
>>> *   "akka.cluster.metrics.ClusterMetricsExtension",*
>>> *   "com.romix.akka.serialization.kryo.KryoSerializationExtension$"*
>>> *   ]*
>>>
>>>
>>> *}*
>>>
>>>
>>>
>>> On Saturday, May 6, 2017 at 5:56:36 PM UTC+5:30, Patrik Nordwall wrote:
>>>>
>>>> First, don't use java serialization for performance and security
>>>> reasons. Secondly, actor messages should be small (a few 100kB at most).
>>>> Otherwise they will prevent other messages to get through, such as cluster
>>>> heartbeat messages. Split the large message into smaller messages, or
>>>> transfer it on a side channel such as Akka Http or Stream TCP. I'd also
>>>> recommend that you try the new remoting implementatio, see Artery in docs.
>>>>
>>>> /Patrik
>>>> fre 5 maj 2017 kl. 16:44 skrev Kunal Ghosh <[email protected]>:
>>>>
>>>>> Hi,
>>>>> my application uses a Akka cluster which has one master node and two
>>>>> child seed nodes. The master node reads data from input file and sends it
>>>>> over to both child nodes for evaluation (processing).
>>>>> The application works fine for smaller data file eg. file with 43 rows
>>>>> but when the input file is hug like with 2 million rows the application
>>>>> fails. The exception thrown with stack trace is given below.
>>>>> I have also attached the configuration file and code examples are
>>>>> attached with this mail please do check them out and tell where I am wrong
>>>>> ????
>>>>> Thanks in advance.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> WARN [18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Sl
>>>>> f4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
>>>>> the default Java serializer for class [org.iceengine.compare.akka.RowData]
>>>>> which is not recommended because of performance implications. Use another
>>>>> serializer or disable this warning using the setting
>>>>> 'akka.actor.warn-about-java-serializer-usage'
>>>>> WARN [18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Sl
>>>>> f4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
>>>>> the default Java serializer for class [org.iceengine.compare.akka.Result]
>>>>> which is not recommended because of performance implications. Use another
>>>>> serializer or disable this warning using the setting
>>>>> 'akka.actor.warn-about-java-serializer-usage'
>>>>> WARN [18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf
>>>>> 4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
>>>>> the default Java serializer for class [org.iceengine.compare.akka.Result]
>>>>> which is not recommended because of performance implications. Use another
>>>>> serializer or disable this warning using the setting
>>>>> 'akka.actor.warn-about-java-serializer-usage'
>>>>> WARN [18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf
>>>>> 4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
>>>>> Node [akka.tcp://[email protected]:2551] - Marking node(s) as
>>>>> UNREACHABLE [Member(address = akka.tcp://[email protected]
>>>>> 0.199:62915, status = Up)]. Node roles [backend]
>>>>> WARN [18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Sl
>>>>> f4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
>>>>> Node [akka.tcp://[email protected]:62915] - Marking node(s) as
>>>>> UNREACHABLE [Member(address = akka.tcp://[email protected]:2551,
>>>>> status = Up)]. Node roles []
>>>>>  Kunal_ICE ERROR[18:48:23.473]{iCEDQApp-a
>>>>> kka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receiv
>>>>> e$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError
>>>>> [akka.tcp://[email protected]:2552] <- [akka.tcp://
>>>>> [email protected]:62915]: Error [null] [
>>>>> java.io.OptionalDataException
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>>>>>  at java.util.HashMap.readObject(HashMap.java:1402)
>>>>>  at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
>>>>>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>> thodAccessorImpl.java:43)
>>>>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass
>>>>> .java:1058)
>>>>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>>> java:1909)
>>>>>  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>>> am.java:1808)
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>>  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>>> m.java:2018)
>>>>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>>> java:1942)
>>>>>  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>>> am.java:1808)
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>>  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>>> m.java:2018)
>>>>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>>> java:1942)
>>>>>  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>>> am.java:1808)
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>>  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>>> m.java:2018)
>>>>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>>> java:1942)
>>>>>  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>>> am.java:1808)
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>>  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>>> m.java:2018)
>>>>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>>> java:1942)
>>>>>  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>>> am.java:1808)
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>>  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>>> m.java:2018)
>>>>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>>> java:1942)
>>>>>  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>>> am.java:1808)
>>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>>>>>  at akka.serialization.JavaSerializer$$anonfun$1.apply(
>>>>> Serializer.scala:304)
>>>>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>>  at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:304)
>>>>>  at akka.serialization.Serialization.akka$serialization$
>>>>> Serialization$$deserializeByteArray(Serialization.scala:151)
>>>>>  at akka.serialization.Serialization$$anonfun$deserialize$2.
>>>>> apply(Serialization.scala:137)
>>>>>  at scala.util.Try$.apply(Try.scala:192)
>>>>>  at akka.serialization.Serialization.deserialize(Serialization.
>>>>> scala:131)
>>>>>  at akka.remote.serialization.MessageContainerSerializer.fromBin
>>>>> ary(MessageContainerSerializer.scala:80)
>>>>>  at akka.serialization.Serialization.akka$serialization$
>>>>> Serialization$$deserializeByteArray(Serialization.scala:151)
>>>>>  at akka.serialization.Serialization$$anonfun$deserialize$2.
>>>>> apply(Serialization.scala:137)
>>>>>  at scala.util.Try$.apply(Try.scala:192)
>>>>>  at akka.serialization.Serialization.deserialize(Serialization.
>>>>> scala:131)
>>>>>  at akka.remote.MessageSerializer$.deserialize(MessageSerializer
>>>>> .scala:30)
>>>>>  at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(
>>>>> Endpoint.scala:64)
>>>>>  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
>>>>>  at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala:69)
>>>>>  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:81)
>>>>>  at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(En
>>>>> dpoint.scala:988)
>>>>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
>>>>>  at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
>>>>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>>>>> java:260)
>>>>>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>> ForkJoinPool.java:1339)
>>>>>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>>> l.java:1979)
>>>>>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>>> orkerThread.java:107)
>>>>>
>>>>>
>>>>> *Front End Class*
>>>>>
>>>>> =======================
>>>>> ActorSystem system = ActorSystem.create("iCEDQApp",
>>>>> ConfigFactory.load());
>>>>>
>>>>>    System.out.println("IceCompareEngine ============ >>>>>>
>>>>> "+context_._ruleType);
>>>>>    ClusterRegisterOnMemberUp registerUp = new
>>>>> ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_
>>>>> ,system,context_._ruleType);
>>>>>    FutureTask<ActorRef> futureTask = new FutureTask<ActorRef>(registerU
>>>>> p);
>>>>>
>>>>> //   ExecutorService executor = Executors.newFixedThreadPool(1);
>>>>> //   executor.execute(futureTask);
>>>>>    Cluster.get(system).registerOnMemberUp(futureTask);
>>>>>    while (true){
>>>>>     try{
>>>>>      if(futureTask.isDone()){
>>>>>       System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> ");
>>>>>       break;
>>>>>      }
>>>>>     }catch (Exception e) {
>>>>>      // TODO: handle exception
>>>>>     }
>>>>>    }
>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>> urrent/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>> p/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>>
>> Patrik Nordwall
>> Akka Tech Lead
>> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
>> Twitter: @patriknw
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to