Hey Patrik, Thanks for the help !!
your solution worked ! 
Now when I run my application in non clustered environment with round robin 
pool (no of instances = 8) it take 23 seconds to process 2 million rows of 
data. 
But when I run same application in clustered environment it took 23 minutes 
!!! The master node reads data from input file and sends it over to both 
*seed-nodes **(each node have no of instances = 4)* for evaluation 
(processing). I can not figure out the reason behind this. 
*Machine Information :*
RAM : 16 GB
Logical Process : 8
Cores : 4
CPU : Intel(R) Core(TM) i7-4700MQ CPU @ 2.40GHz

Can you Help ???
*I have also attached the configuration file and code examples are attached 
with this mail please do check them out.*

*Application configuration file (without cluster)*

iCEDQDispatcher{

 worker-dispatcher {
      type = Dispatcher
     executor = "fork-join-executor"
     fork-join-executor {
         parallelism-min = 2
         parallelism-factor = 2.0
      parallelism-max = 64
      }
      throughput = 5
   }
 
   CallingThreadDispatcher {
         type = akka.testkit.CallingThreadDispatcherConfigurator
    }
}


*Application configuration file (with cluster)*












*akka {  loggers = ["akka.event.slf4j.Slf4jLogger"]     loglevel = "INFO"   
stdout-loglevel = "INFO"   logging-filter = 
"akka.event.slf4j.Slf4jLoggingFilter"    actor.provider = 
"akka.cluster.ClusterActorRefProvider" remote {    
log-remote-lifecycle-events = on    log-sent-messages = on    
log-received-messages = on    artery {      enabled = on      
canonical.hostname = "192.168.100.199"      canonical.port = 25520    }   
}actor {    kryo {      type = "graph"      idstrategy = "incremental"      
#"explicit"      buffer-size = 4096      max-buffer-size = -1      
use-manifest = false      implicit-registration-logging = true      
kryo-trace = false       mappings {      "[Ljava.lang.String;" = 20      
"[Lorg.iceengine.compare.engine.ICEColSource;" = 21      
"org.iceengine.compare.engine.ICEColSource$Type" = 22      
"java.text.DecimalFormat" = 23      "java.math.RoundingMode" = 24      
"java.text.DecimalFormatSymbols" = 25      "java.util.Locale" = 26      
"[Ljava.lang.Integer;" = 27      "[Ljava.lang.Object;" = 28      
"[Lorg.iceengine.compare.engine.ICECompareColumnData;" = 29      
"[Lorg.iceengine.common.CompareChain;" = 30      
"[Lorg.apache.commons.collections.comparators.ComparatorChain;" = 31        
      "org.iceengine.compare.engine.ICEEngineContext" = 32      
"org.iceengine.compare.akka.RowData" = 33      
"org.iceengine.compare.akka.DataConsumerInspector" = 34      
"org.iceengine.compare.akka.Result" = 35      
"org.iceengine.common.pairs.IceEnginePairs" = 36 
"org.iceengine.common.pairs.IcePairKey" = 37 
"org.iceengine.common.pairs.IceEnginePairsImpl" = 38 
"org.iceengine.common.DataComparator" = 39 
"org.iceengine.common.pairs.IcePairValue" = 40 
"org.iceengine.common.MapCont" = 41 "org.iceengine.common.ObjectComparator" 
= 42 "org.iceengine.compare.akka.AtomicObject" = 43 
"org.iceengine.compare.akka.SimpleBindings" = 44 
"org.iceengine.compare.akka.Work" = 45 
"org.iceengine.compare.comparator.DKChainDiffor" = 46 
"org.iceengine.compare.comparator.DKConvertingDiffor" = 47 
"org.iceengine.compare.comparator.DKDateDiffor" = 48 
"org.iceengine.compare.comparator.DKEqualsDiffor" = 49 
"org.iceengine.compare.comparator.DKIdentityDiffor" = 50 
"org.iceengine.compare.comparator.DKNumberDiffor" = 51 
"org.iceengine.compare.comparator.DKTextDiffor" = 52 
"org.iceengine.compare.conf.DefaultDataComparison" = 53 
"org.iceengine.compare.conf.ICEBuildRule" = 54 
"org.iceengine.compare.conf.ICEDepRule" = 55 
"org.iceengine.compare.conf.ICEEngine" = 56 
"org.iceengine.compare.conf.ICEExpression" = 57 
"org.iceengine.compare.conf.ICERule" = 58 
"org.iceengine.compare.conf.ICERulePT" = 59 
"org.iceengine.compare.conf.ICERuleRuleContainer" = 60 
"org.iceengine.compare.engine.CommonRowObj" = 61 
"org.iceengine.compare.engine.DataComparison" = 62 
"org.iceengine.compare.engine.ICEColSource" = 63 
"org.iceengine.compare.engine.ICECompareColInRow" = 64 
"org.iceengine.compare.engine.ICECompareColumn" = 65 
"org.iceengine.compare.engine.ICECompareColumnData" = 66 
"org.iceengine.compare.engine.ICECompareEngine" = 67 
"org.iceengine.compare.engine.ICECompareRow" = 68 
"org.iceengine.compare.engine.ICEExprCompare" = 69 
"org.iceengine.compare.engine.ICEFinalExpressionDiff" = 70 
"org.iceengine.compare.engine.ICESourceUtil" = 71 
"org.iceengine.compare.engine.ICEUniqueSource" = 72 
"org.iceengine.compare.sources.EngineWritterFormat" = 73 
"org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = 74 
"org.iceengine.compare.sources.FileSort" = 75 
"org.iceengine.compare.sources.FinalWritter" = 76 
"org.iceengine.compare.sources.ICEDBWritter" = 77 
"org.iceengine.compare.sources.IceFileWritter" = 78 
"org.iceengine.compare.sources.ICESourceDataBase" = 79 
"org.iceengine.compare.sources.IceSourceFixedFile" = 80 
"org.iceengine.compare.sources.ICESourceFlatFile" = 81 
"org.iceengine.compare.sources.ICESourceList" = 82 
"org.iceengine.compare.sources.ICESourceSS" = 83 
"org.iceengine.compare.sources.ICESourceXMLFile" = 84 
"org.iceengine.compare.sources.IceUtilsDifference" = 85 
"org.iceengine.compare.sources.ListWritter" = 86 
"org.iceengine.compare.sources.LogWriter" = 87 
"org.iceengine.compare.sources.SqlWritter" = 88 
"org.iceengine.compare.sources.SSSheet" = 89 
"org.iceengine.compare.sources.SSSheetImpl" = 90 
"org.iceengine.db.ColumnMetaData" = 91 "org.iceengine.db.DatabaseKeyImpl" = 
92 "org.iceengine.db.DatabaseTab" = 93 
"org.iceengine.db.DatabaseTabInfoDataPer" = 94 
"org.iceengine.db.DatabaseTabPer" = 95 
"org.iceengine.db.EngineConnectionInfo" = 96 "org.iceengine.db.ICEDB" = 97 
"org.iceengine.db.ICEInfoDataType" = 98 "org.iceengine.db.QueryBuilder" = 
99 "org.iceengine.utils.ArrayUtil" = 100 "org.iceengine.utils.BooleanUtil" 
= 101 "org.iceengine.utils.ClassUtil" = 102 
"org.iceengine.utils.DataBaseUtilUtil" = 103 
"org.iceengine.utils.EngineXMLUtils" = 104 "org.iceengine.utils.FileUtil" = 
105 "org.iceengine.utils.ICEDataUtil" = 106 
"org.iceengine.utils.ICEEngineConstants" = 107 
"org.iceengine.utils.ICEUtilsObject" = 108 
"org.iceengine.utils.IceUtilsRes" = 109 
"org.iceengine.utils.ICEUtilsString" = 110 
"org.iceengine.utils.SpringUtils" = 111 
"org.apache.commons.collections.OrderedMap" = 112 
"org.apache.commons.collections.map.LinkedMap" = 113 "java.io.File" = 114 
"java.util.HashMap" = 115 
"org.iceengine.compare.engine.ICECompare$CompareType" = 116 
"org.iceengine.common.CompareChain" = 117      
"org.apache.commons.collections.comparators.ComparatorChain" = 118      
"java.util.ArrayList" = 119      "java.util.BitSet" = 120      
"[Lorg.iceengine.compare.engine.ICEUniqueSource;" = 121      
"[Lorg.iceengine.common.MapCont;" = 122      
"org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = 123      
"java.util.concurrent.atomic.AtomicLong" = 124      
"java.util.concurrent.ConcurrentHashMap" = 125      
"java.util.concurrent.atomic.AtomicLongArray" = 126      
"java.util.concurrent.atomic.AtomicBoolean" = 127      
"java.text.SimpleDateFormat" = 128      "java.util.GregorianCalendar" = 
129      "java.util.Date" = 130      "java.text.DateFormatSymbols" = 131    
  "javax.script.ScriptEngineManager" = 132      }          }     
serialize-messages = on    serializers {      #java = 
"akka.serialization.JavaSerializer"      kryo = 
"com.romix.akka.serialization.kryo.KryoSerializer"    }    
serialization-bindings {                   "[Ljava.lang.String;" = kryo    
  "[Lorg.iceengine.compare.engine.ICEColSource;" = kryo      
"org.iceengine.compare.engine.ICEColSource$Type" = kryo      
"java.text.DecimalFormat" = kryo      "java.math.RoundingMode" = kryo      
"java.text.DecimalFormatSymbols" = kryo      "java.util.Locale" = kryo      
"[Ljava.lang.Integer;" = kryo      "[Ljava.lang.Object;" = kryo      
"[Lorg.iceengine.compare.engine.ICECompareColumnData;" = kryo      
"[Ljava.util.Comparator;" = kryo      
"[Lorg.iceengine.common.CompareChain;" = kryo            
"org.iceengine.compare.engine.ICEEngineContext" = kryo      
"org.iceengine.compare.akka.RowData" = kryo      
"org.iceengine.compare.akka.DataConsumerInspector" = kryo      
"org.iceengine.compare.akka.Result" = kryo      
"org.iceengine.common.pairs.IceEnginePairs" = 
kryo"org.iceengine.common.pairs.IcePairKey" = 
kryo"org.iceengine.common.pairs.IceEnginePairsImpl" = 
kryo"org.iceengine.common.DataComparator" = 
kryo"org.iceengine.common.pairs.IcePairValue" = 
kryo"org.iceengine.common.MapCont" = 
kryo"org.iceengine.common.ObjectComparator" = 
kryo"org.iceengine.compare.akka.AtomicObject" = 
kryo"org.iceengine.compare.akka.SimpleBindings" = 
kryo"org.iceengine.compare.akka.Work" = 
kryo"org.iceengine.compare.comparator.DKChainDiffor" = 
kryo"org.iceengine.compare.comparator.DKConvertingDiffor" = 
kryo"org.iceengine.compare.comparator.DKDateDiffor" = 
kryo"org.iceengine.compare.comparator.DKEqualsDiffor" = 
kryo"org.iceengine.compare.comparator.DKIdentityDiffor" = 
kryo"org.iceengine.compare.comparator.DKNumberDiffor" = 
kryo"org.iceengine.compare.comparator.DKTextDiffor" = 
kryo"org.iceengine.compare.conf.DefaultDataComparison" = 
kryo"org.iceengine.compare.conf.ICEBuildRule" = 
kryo"org.iceengine.compare.conf.ICEDepRule" = 
kryo"org.iceengine.compare.conf.ICEEngine" = 
kryo"org.iceengine.compare.conf.ICEExpression" = 
kryo"org.iceengine.compare.conf.ICERule" = 
kryo"org.iceengine.compare.conf.ICERulePT" = 
kryo"org.iceengine.compare.conf.ICERuleRuleContainer" = 
kryo"org.iceengine.compare.engine.CommonRowObj" = 
kryo"org.iceengine.compare.engine.DataComparison" = 
kryo"org.iceengine.compare.engine.ICEColSource" = 
kryo"org.iceengine.compare.engine.ICECompareColInRow" = 
kryo"org.iceengine.compare.engine.ICECompareColumn" = 
kryo"org.iceengine.compare.engine.ICECompareColumnData" = 
kryo"org.iceengine.compare.engine.ICECompareEngine" = 
kryo"org.iceengine.compare.engine.ICECompareRow" = 
kryo"org.iceengine.compare.engine.ICEExprCompare" = 
kryo"org.iceengine.compare.engine.ICEFinalExpressionDiff" = 
kryo"org.iceengine.compare.engine.ICESourceUtil" = 
kryo"org.iceengine.compare.engine.ICEUniqueSource" = 
kryo"org.iceengine.compare.sources.EngineWritterFormat" = 
kryo"org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = 
kryo"org.iceengine.compare.sources.FileSort" = 
kryo"org.iceengine.compare.sources.FinalWritter" = 
kryo"org.iceengine.compare.sources.ICEDBWritter" = 
kryo"org.iceengine.compare.sources.IceFileWritter" = 
kryo"org.iceengine.compare.sources.ICESourceDataBase" = 
kryo"org.iceengine.compare.sources.IceSourceFixedFile" = 
kryo"org.iceengine.compare.sources.ICESourceFlatFile" = 
kryo"org.iceengine.compare.sources.ICESourceList" = 
kryo"org.iceengine.compare.sources.ICESourceSS" = 
kryo"org.iceengine.compare.sources.ICESourceXMLFile" = 
kryo"org.iceengine.compare.sources.IceUtilsDifference" = 
kryo"org.iceengine.compare.sources.ListWritter" = 
kryo"org.iceengine.compare.sources.LogWriter" = 
kryo"org.iceengine.compare.sources.SqlWritter" = 
kryo"org.iceengine.compare.sources.SSSheet" = 
kryo"org.iceengine.compare.sources.SSSheetImpl" = 
kryo"org.iceengine.db.ColumnMetaData" = 
kryo"org.iceengine.db.DatabaseKeyImpl" = kryo"org.iceengine.db.DatabaseTab" 
= kryo"org.iceengine.db.DatabaseTabInfoDataPer" = 
kryo"org.iceengine.db.DatabaseTabPer" = 
kryo"org.iceengine.db.EngineConnectionInfo" = kryo"org.iceengine.db.ICEDB" 
= kryo"org.iceengine.db.ICEInfoDataType" = 
kryo"org.iceengine.db.QueryBuilder" = kryo"org.iceengine.utils.ArrayUtil" = 
kryo"org.iceengine.utils.BooleanUtil" = kryo"org.iceengine.utils.ClassUtil" 
= kryo"org.iceengine.utils.DataBaseUtilUtil" = 
kryo"org.iceengine.utils.EngineXMLUtils" = 
kryo"org.iceengine.utils.FileUtil" = kryo"org.iceengine.utils.ICEDataUtil" 
= kryo"org.iceengine.utils.ICEEngineConstants" = 
kryo"org.iceengine.utils.ICEUtilsObject" = 
kryo"org.iceengine.utils.IceUtilsRes" = 
kryo"org.iceengine.utils.ICEUtilsString" = 
kryo"org.apache.commons.collections.OrderedMap" = 
kryo"org.apache.commons.collections.map.LinkedMap" = kryo"java.io.File" = 
kryo"java.util.HashMap" = 
kryo"org.iceengine.compare.engine.ICECompare$CompareType" = 
kryo"org.iceengine.common.CompareChain" = kryo      
"org.apache.commons.collections.comparators.ComparatorChain" = kryo      
"[Lorg.apache.commons.collections.comparators.ComparatorChain;" = kryo      
"java.util.ArrayList" = kryo      "java.util.BitSet" = kryo      
"[Lorg.iceengine.compare.engine.ICEUniqueSource;" = kryo      
"[Lorg.iceengine.common.MapCont;" = kryo      
"org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = kryo      
"java.util.concurrent.atomic.AtomicLong" = kryo      
 "java.util.concurrent.ConcurrentHashMap" = kryo      
 "java.util.concurrent.atomic.AtomicLongArray" = kryo      
 "java.util.concurrent.atomic.AtomicBoolean" = kryo      
 "java.text.SimpleDateFormat" = kryo       "java.util.GregorianCalendar" = 
kryo       "java.util.Date" = kryo       "java.text.DateFormatSymbols" = 
kryo       "javax.script.ScriptEngineManager" = kryo    }  }  cluster {    
seed-nodes = [      "akka://iCEDQApp@192.168.100.199:2551",      
"akka://iCEDQApp@192.168.100.199:2552"]    #auto-down-unreachable-after = 
10s  }actor.allow-java-serialization = offactor.deployment {   "/*/*" {    
  # Router type provided by metrics extension.     #router = 
cluster-metrics-adaptive-group    router = round-robin-group    # Router 
parameter specific for metrics extension.    # metrics-selector = heap    # 
metrics-selector = load    # metrics-selector = cpu    metrics-selector = 
mix    #    routees.paths = ["/user/expEvaluationBackend"]    
nr-of-instances = 100    cluster {      enabled = on      use-role = 
backend      max-nr-of-instances-per-node = 3      allow-local-routees = 
off    }  }     }worker-dispatcher {    type = Dispatcher   executor = 
"fork-join-executor"   fork-join-executor {        parallelism-min = 2      
 parallelism-factor = 2.0    parallelism-max = 64    }    throughput = 5  
 }  # Disable legacy metrics in 
akka-cluster.cluster.metrics.enabled=offextensions=[ 
"akka.cluster.metrics.ClusterMetricsExtension", 
"com.romix.akka.serialization.kryo.KryoSerializationExtension$" ]  }*
On Monday, May 8, 2017 at 6:37:17 PM UTC+5:30, Patrik Nordwall wrote:
>
> the port number for the seed-nodes does not match canonical.port = 25520
>
> replacing akka.tcp with akka is correct, and if you have that in the code 
> somewhere it must be changed there also
>
> On Mon, May 8, 2017 at 2:36 PM, Kunal Ghosh <kunal....@gmail.com 
> <javascript:>> wrote:
>
>> Thanks @Patrik Your help is much appreciated !! 
>>
>> Below are my configuration for Kryo Serialization and Artery remote 
>> implementation in application.conf file. Please go through it and tell me 
>> whether is it correct ?? 
>> Also I have a question that changing configuration is enough or I will 
>> have to make changes in the code as well?
>>
>> *application.conf*
>>
>> *akka {*
>>
>> *  loggers = ["akka.event.slf4j.Slf4jLogger"]*
>>  *  loglevel = "DEBUG"*
>>  
>> *  stdout-loglevel = "DEBUG"*
>>  
>> *  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"*
>>   
>> *  actor.provider = "akka.cluster.ClusterActorRefProvider"*
>>
>> *# Artery remoting implementation*
>>
>> *remote {*
>> *    log-remote-lifecycle-events = on*
>> *    log-sent-messages = on*
>> *    log-received-messages = on*
>> *    artery {*
>> *      enabled = on*
>> *      canonical.hostname = "192.168.100.199"*
>> *      canonical.port = 25520*
>> *    }* 
>> *  }*
>>   
>> *# **KryoSerializer Configuration*
>>
>> *  actor {*
>> *    kryo {*
>> *      type = "graph"*
>> *      idstrategy = "incremental"*
>> *      buffer-size = 4096*
>> *      max-buffer-size = -1*
>> *      use-manifest = false*
>> *      implicit-registration-logging = true*
>> *      kryo-trace = true*
>>
>> *      mappings {*
>> *        "org.iceengine.compare.engine.ICEEngineContext" = 32*
>> *      "org.iceengine.compare.akka.RowData" = 33*
>> *      "org.iceengine.compare.akka.DataConsumerInspector" = 34*
>> *      "org.iceengine.compare.akka.Result" = 35*
>> *      }*
>> *    }*
>>
>> *    serialize-messages = on*
>> *    serializers {*
>> *      #java = "akka.serialization.JavaSerializer"*
>> *      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"*
>> *    }*
>> *    serialization-bindings {*
>> *      "org.iceengine.compare.engine.ICEEngineContext" = kryo*
>> *      "org.iceengine.compare.akka.RowData" = kryo*
>> *      "org.iceengine.compare.akka.DataConsumerInspector" = kryo*
>> *      "org.iceengine.compare.akka.Result" = kryo*
>> *    }*
>> *  }*
>>     
>> *  cluster {*
>> *    seed-nodes = [*
>> *      "akka://iCEDQApp@192.168.100.199:2551 
>> <http://iCEDQApp@192.168.100.199:2551>",*
>> *      "akka://iCEDQApp@192.168.100.199:2552 
>> <http://iCEDQApp@192.168.100.199:2552>"]*
>>
>> *    #auto-down-unreachable-after = 10s*
>> *  }*
>>
>> *akka.cluster.min-nr-of-members =3*
>> *# //#min-nr-of-members*
>>
>> *# //#role-min-nr-of-members*
>> *akka.cluster.role {*
>> *  frontend.min-nr-of-members = 1*
>> *  backend.min-nr-of-members = 2*
>> *}*
>> *actor.allow-java-serialization = off*
>> *actor.deployment {*
>>  
>> *  "/*/*" {*
>>   
>> *    # Router type provided by metrics extension. *
>> *    #router = cluster-metrics-adaptive-group*
>> *    router = round-robin-group*
>> *    # Router parameter specific for metrics extension.*
>> *    # metrics-selector = heap*
>> *    # metrics-selector = load*
>> *    # metrics-selector = cpu*
>> *    metrics-selector = mix*
>> *    #*
>> *    routees.paths = ["/user/expEvaluationBackend"]*
>> *    nr-of-instances = 100*
>> *    cluster {*
>> *      enabled = on*
>> *      use-role = backend*
>> *      max-nr-of-instances-per-node = 3*
>> *      allow-local-routees = off*
>> *    }*
>> *  }*
>>  
>> *}*
>> *  # Disable legacy metrics in akka-cluster.*
>> *cluster.metrics.enabled=off*
>>
>> *# Enable metrics extension in akka-cluster-metrics.*
>> *extensions=[*
>> *   "akka.cluster.metrics.ClusterMetricsExtension",*
>> *   "com.romix.akka.serialization.kryo.KryoSerializationExtension$"*
>> *   ]*
>>
>>   
>> *}*
>>
>>
>>
>> On Saturday, May 6, 2017 at 5:56:36 PM UTC+5:30, Patrik Nordwall wrote:
>>>
>>> First, don't use java serialization for performance and security 
>>> reasons. Secondly, actor messages should be small (a few 100kB at most). 
>>> Otherwise they will prevent other messages to get through, such as cluster 
>>> heartbeat messages. Split the large message into smaller messages, or 
>>> transfer it on a side channel such as Akka Http or Stream TCP. I'd also 
>>> recommend that you try the new remoting implementatio, see Artery in docs.
>>>
>>> /Patrik
>>> fre 5 maj 2017 kl. 16:44 skrev Kunal Ghosh <kunal....@gmail.com>:
>>>
>>>> Hi,
>>>> my application uses a Akka cluster which has one master node and two 
>>>> child seed nodes. The master node reads data from input file and sends it 
>>>> over to both child nodes for evaluation (processing).
>>>> The application works fine for smaller data file eg. file with 43 rows 
>>>> but when the input file is hug like with 2 million rows the application 
>>>> fails. The exception thrown with stack trace is given below.
>>>> I have also attached the configuration file and code examples are 
>>>> attached with this mail please do check them out and tell where I am wrong 
>>>> ????
>>>> Thanks in advance.
>>>>
>>>>
>>>>
>>>>
>>>> WARN 
>>>> [18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
>>>>  
>>>> the default Java serializer for class [org.iceengine.compare.akka.RowData] 
>>>> which is not recommended because of performance implications. Use another 
>>>> serializer or disable this warning using the setting 
>>>> 'akka.actor.warn-about-java-serializer-usage'
>>>> WARN 
>>>> [18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
>>>>  
>>>> the default Java serializer for class [org.iceengine.compare.akka.Result] 
>>>> which is not recommended because of performance implications. Use another 
>>>> serializer or disable this warning using the setting 
>>>> 'akka.actor.warn-about-java-serializer-usage'
>>>> WARN 
>>>> [18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
>>>>  
>>>> the default Java serializer for class [org.iceengine.compare.akka.Result] 
>>>> which is not recommended because of performance implications. Use another 
>>>> serializer or disable this warning using the setting 
>>>> 'akka.actor.warn-about-java-serializer-usage'
>>>> WARN 
>>>> [18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
>>>>  
>>>> Node [akka.tcp://iCEDQApp@192.168.100.199:2551] - Marking node(s) as 
>>>> UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:62915, 
>>>> status = Up)]. Node roles [backend]
>>>> WARN 
>>>> [18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
>>>>  
>>>> Node [akka.tcp://iCEDQApp@192.168.100.199:62915] - Marking node(s) as 
>>>> UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:2551, 
>>>> status = Up)]. Node roles []
>>>>  Kunal_ICE 
>>>> ERROR[18:48:23.473]{iCEDQApp-akka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError
>>>>  
>>>> [akka.tcp://iCEDQApp@192.168.100.199:2552] <- [akka.tcp://
>>>> iCEDQApp@192.168.100.199:62915]: Error [null] [
>>>> java.io.OptionalDataException
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>>>>  at java.util.HashMap.readObject(HashMap.java:1402)
>>>>  at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
>>>>  at 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>>>  at 
>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>>  at 
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>>>>  at 
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>  at 
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>>>>  at 
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>>>>  at 
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>  at 
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>>>>  at 
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>>>>  at 
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>  at 
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>>>>  at 
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>>>>  at 
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>  at 
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>>>>  at 
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>>>>  at 
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>  at 
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>>>>  at 
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>>>>  at 
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>>>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>>>>  at 
>>>> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:304)
>>>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>  at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:304)
>>>>  at 
>>>> akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
>>>>  at 
>>>> akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
>>>>  at scala.util.Try$.apply(Try.scala:192)
>>>>  at 
>>>> akka.serialization.Serialization.deserialize(Serialization.scala:131)
>>>>  at 
>>>> akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:80)
>>>>  at 
>>>> akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
>>>>  at 
>>>> akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
>>>>  at scala.util.Try$.apply(Try.scala:192)
>>>>  at 
>>>> akka.serialization.Serialization.deserialize(Serialization.scala:131)
>>>>  at 
>>>> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
>>>>  at 
>>>> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
>>>>  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
>>>>  at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala:69)
>>>>  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:81)
>>>>  at 
>>>> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:988)
>>>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
>>>>  at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
>>>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>  at 
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>  at 
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>  at 
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> *Front End Class*
>>>>
>>>> =======================
>>>> ActorSystem system = 
>>>> ActorSystem.create("iCEDQApp",ConfigFactory.load());
>>>>    
>>>>    System.out.println("IceCompareEngine ============ >>>>>> 
>>>> "+context_._ruleType);
>>>>    ClusterRegisterOnMemberUp registerUp = new 
>>>> ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_,system,context_._ruleType);
>>>>    FutureTask<ActorRef> futureTask = new 
>>>> FutureTask<ActorRef>(registerUp);
>>>>
>>>> //   ExecutorService executor = Executors.newFixedThreadPool(1);
>>>> //   executor.execute(futureTask);
>>>>    Cluster.get(system).registerOnMemberUp(futureTask);
>>>>    while (true){
>>>>     try{
>>>>      if(futureTask.isDone()){
>>>>       System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> ");
>>>>       break;
>>>>      }
>>>>     }catch (Exception e) {
>>>>      // TODO: handle exception
>>>>     }
>>>>    }
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
akka {

# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]
 
  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "INFO"
 
  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "INFO"
 
  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  
  actor.provider = "akka.cluster.ClusterActorRefProvider"

 remote {
    log-remote-lifecycle-events = on
    log-sent-messages = on
    log-received-messages = on
    artery {
      enabled = on
      canonical.hostname = "192.168.100.199"
      canonical.port = 25520
    } 
  }

actor {
    kryo {
      type = "graph"
      idstrategy = "incremental"
      #"explicit"
      buffer-size = 4096
      max-buffer-size = -1
      use-manifest = false
      implicit-registration-logging = true
      kryo-trace = false
        
      mappings {
      "[Ljava.lang.String;" = 20
      "[Lorg.iceengine.compare.engine.ICEColSource;" = 21
      "org.iceengine.compare.engine.ICEColSource$Type" = 22
      "java.text.DecimalFormat" = 23
      "java.math.RoundingMode" = 24
      "java.text.DecimalFormatSymbols" = 25
      "java.util.Locale" = 26
      "[Ljava.lang.Integer;" = 27
      "[Ljava.lang.Object;" = 28
      "[Lorg.iceengine.compare.engine.ICECompareColumnData;" = 29
      "[Lorg.iceengine.common.CompareChain;" = 30
      "[Lorg.apache.commons.collections.comparators.ComparatorChain;" = 31
      
        "org.iceengine.compare.engine.ICEEngineContext" = 32
      "org.iceengine.compare.akka.RowData" = 33
      "org.iceengine.compare.akka.DataConsumerInspector" = 34
      "org.iceengine.compare.akka.Result" = 35
        "org.iceengine.common.pairs.IceEnginePairs" = 36
        "org.iceengine.common.pairs.IcePairKey" = 37
        "org.iceengine.common.pairs.IceEnginePairsImpl" = 38
        "org.iceengine.common.DataComparator" = 39
        "org.iceengine.common.pairs.IcePairValue" = 40
        "org.iceengine.common.MapCont" = 41
        "org.iceengine.common.ObjectComparator" = 42
        "org.iceengine.compare.akka.AtomicObject" = 43
        "org.iceengine.compare.akka.SimpleBindings" = 44
        "org.iceengine.compare.akka.Work" = 45
        "org.iceengine.compare.comparator.DKChainDiffor" = 46
        "org.iceengine.compare.comparator.DKConvertingDiffor" = 47
        "org.iceengine.compare.comparator.DKDateDiffor" = 48
        "org.iceengine.compare.comparator.DKEqualsDiffor" = 49
        "org.iceengine.compare.comparator.DKIdentityDiffor" = 50
        "org.iceengine.compare.comparator.DKNumberDiffor" = 51
        "org.iceengine.compare.comparator.DKTextDiffor" = 52
        "org.iceengine.compare.conf.DefaultDataComparison" = 53
        "org.iceengine.compare.conf.ICEBuildRule" = 54
        "org.iceengine.compare.conf.ICEDepRule" = 55
        "org.iceengine.compare.conf.ICEEngine" = 56
        "org.iceengine.compare.conf.ICEExpression" = 57
        "org.iceengine.compare.conf.ICERule" = 58
        "org.iceengine.compare.conf.ICERulePT" = 59
        "org.iceengine.compare.conf.ICERuleRuleContainer" = 60
        "org.iceengine.compare.engine.CommonRowObj" = 61
        "org.iceengine.compare.engine.DataComparison" = 62
        "org.iceengine.compare.engine.ICEColSource" = 63
        "org.iceengine.compare.engine.ICECompareColInRow" = 64
        "org.iceengine.compare.engine.ICECompareColumn" = 65
        "org.iceengine.compare.engine.ICECompareColumnData" = 66
        "org.iceengine.compare.engine.ICECompareEngine" = 67
        "org.iceengine.compare.engine.ICECompareRow" = 68
        "org.iceengine.compare.engine.ICEExprCompare" = 69
        "org.iceengine.compare.engine.ICEFinalExpressionDiff" = 70
        "org.iceengine.compare.engine.ICESourceUtil" = 71
        "org.iceengine.compare.engine.ICEUniqueSource" = 72
        "org.iceengine.compare.sources.EngineWritterFormat" = 73
        "org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = 74
        "org.iceengine.compare.sources.FileSort" = 75
        "org.iceengine.compare.sources.FinalWritter" = 76
        "org.iceengine.compare.sources.ICEDBWritter" = 77
        "org.iceengine.compare.sources.IceFileWritter" = 78
        "org.iceengine.compare.sources.ICESourceDataBase" = 79
        "org.iceengine.compare.sources.IceSourceFixedFile" = 80
        "org.iceengine.compare.sources.ICESourceFlatFile" = 81
        "org.iceengine.compare.sources.ICESourceList" = 82
        "org.iceengine.compare.sources.ICESourceSS" = 83
        "org.iceengine.compare.sources.ICESourceXMLFile" = 84
        "org.iceengine.compare.sources.IceUtilsDifference" = 85
        "org.iceengine.compare.sources.ListWritter" = 86
        "org.iceengine.compare.sources.LogWriter" = 87
        "org.iceengine.compare.sources.SqlWritter" = 88
        "org.iceengine.compare.sources.SSSheet" = 89
        "org.iceengine.compare.sources.SSSheetImpl" = 90
        "org.iceengine.db.ColumnMetaData" = 91
        "org.iceengine.db.DatabaseKeyImpl" = 92
        "org.iceengine.db.DatabaseTab" = 93
        "org.iceengine.db.DatabaseTabInfoDataPer" = 94
        "org.iceengine.db.DatabaseTabPer" = 95
        "org.iceengine.db.EngineConnectionInfo" = 96
        "org.iceengine.db.ICEDB" = 97
        "org.iceengine.db.ICEInfoDataType" = 98
        "org.iceengine.db.QueryBuilder" = 99
        "org.iceengine.utils.ArrayUtil" = 100
        "org.iceengine.utils.BooleanUtil" = 101
        "org.iceengine.utils.ClassUtil" = 102
        "org.iceengine.utils.DataBaseUtilUtil" = 103
        "org.iceengine.utils.EngineXMLUtils" = 104
        "org.iceengine.utils.FileUtil" = 105
        "org.iceengine.utils.ICEDataUtil" = 106
        "org.iceengine.utils.ICEEngineConstants" = 107
        "org.iceengine.utils.ICEUtilsObject" = 108
        "org.iceengine.utils.IceUtilsRes" = 109
        "org.iceengine.utils.ICEUtilsString" = 110
        "org.iceengine.utils.SpringUtils" = 111
        "org.apache.commons.collections.OrderedMap" = 112
        "org.apache.commons.collections.map.LinkedMap" = 113
        "java.io.File" = 114
        "java.util.HashMap" = 115
        "org.iceengine.compare.engine.ICECompare$CompareType" = 116
        "org.iceengine.common.CompareChain" = 117
      "org.apache.commons.collections.comparators.ComparatorChain" = 118
      "java.util.ArrayList" = 119
      "java.util.BitSet" = 120
      "[Lorg.iceengine.compare.engine.ICEUniqueSource;" = 121
      "[Lorg.iceengine.common.MapCont;" = 122
      "org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = 123
      "java.util.concurrent.atomic.AtomicLong" = 124
      "java.util.concurrent.ConcurrentHashMap" = 125
      "java.util.concurrent.atomic.AtomicLongArray" = 126
      "java.util.concurrent.atomic.AtomicBoolean" = 127
      "java.text.SimpleDateFormat" = 128
      "java.util.GregorianCalendar" = 129
      "java.util.Date" = 130
      "java.text.DateFormatSymbols" = 131
      "javax.script.ScriptEngineManager" = 132
      }
      
    }
        
    serialize-messages = on
    serializers {
      #java = "akka.serialization.JavaSerializer"
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    serialization-bindings {
    
     
    
      "[Ljava.lang.String;" = kryo
      "[Lorg.iceengine.compare.engine.ICEColSource;" = kryo
      "org.iceengine.compare.engine.ICEColSource$Type" = kryo
      "java.text.DecimalFormat" = kryo
      "java.math.RoundingMode" = kryo
      "java.text.DecimalFormatSymbols" = kryo
      "java.util.Locale" = kryo
      "[Ljava.lang.Integer;" = kryo
      "[Ljava.lang.Object;" = kryo
      "[Lorg.iceengine.compare.engine.ICECompareColumnData;" = kryo
      "[Ljava.util.Comparator;" = kryo
      "[Lorg.iceengine.common.CompareChain;" = kryo
      
      "org.iceengine.compare.engine.ICEEngineContext" = kryo
      "org.iceengine.compare.akka.RowData" = kryo
      "org.iceengine.compare.akka.DataConsumerInspector" = kryo
      "org.iceengine.compare.akka.Result" = kryo
      "org.iceengine.common.pairs.IceEnginePairs" = kryo
"org.iceengine.common.pairs.IcePairKey" = kryo
"org.iceengine.common.pairs.IceEnginePairsImpl" = kryo
"org.iceengine.common.DataComparator" = kryo
"org.iceengine.common.pairs.IcePairValue" = kryo
"org.iceengine.common.MapCont" = kryo
"org.iceengine.common.ObjectComparator" = kryo
"org.iceengine.compare.akka.AtomicObject" = kryo
"org.iceengine.compare.akka.SimpleBindings" = kryo
"org.iceengine.compare.akka.Work" = kryo
"org.iceengine.compare.comparator.DKChainDiffor" = kryo
"org.iceengine.compare.comparator.DKConvertingDiffor" = kryo
"org.iceengine.compare.comparator.DKDateDiffor" = kryo
"org.iceengine.compare.comparator.DKEqualsDiffor" = kryo
"org.iceengine.compare.comparator.DKIdentityDiffor" = kryo
"org.iceengine.compare.comparator.DKNumberDiffor" = kryo
"org.iceengine.compare.comparator.DKTextDiffor" = kryo
"org.iceengine.compare.conf.DefaultDataComparison" = kryo
"org.iceengine.compare.conf.ICEBuildRule" = kryo
"org.iceengine.compare.conf.ICEDepRule" = kryo
"org.iceengine.compare.conf.ICEEngine" = kryo
"org.iceengine.compare.conf.ICEExpression" = kryo
"org.iceengine.compare.conf.ICERule" = kryo
"org.iceengine.compare.conf.ICERulePT" = kryo
"org.iceengine.compare.conf.ICERuleRuleContainer" = kryo
"org.iceengine.compare.engine.CommonRowObj" = kryo
"org.iceengine.compare.engine.DataComparison" = kryo
"org.iceengine.compare.engine.ICEColSource" = kryo
"org.iceengine.compare.engine.ICECompareColInRow" = kryo
"org.iceengine.compare.engine.ICECompareColumn" = kryo
"org.iceengine.compare.engine.ICECompareColumnData" = kryo
"org.iceengine.compare.engine.ICECompareEngine" = kryo
"org.iceengine.compare.engine.ICECompareRow" = kryo
"org.iceengine.compare.engine.ICEExprCompare" = kryo
"org.iceengine.compare.engine.ICEFinalExpressionDiff" = kryo
"org.iceengine.compare.engine.ICESourceUtil" = kryo
"org.iceengine.compare.engine.ICEUniqueSource" = kryo
"org.iceengine.compare.sources.EngineWritterFormat" = kryo
"org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = kryo
"org.iceengine.compare.sources.FileSort" = kryo
"org.iceengine.compare.sources.FinalWritter" = kryo
"org.iceengine.compare.sources.ICEDBWritter" = kryo
"org.iceengine.compare.sources.IceFileWritter" = kryo
"org.iceengine.compare.sources.ICESourceDataBase" = kryo
"org.iceengine.compare.sources.IceSourceFixedFile" = kryo
"org.iceengine.compare.sources.ICESourceFlatFile" = kryo
"org.iceengine.compare.sources.ICESourceList" = kryo
"org.iceengine.compare.sources.ICESourceSS" = kryo
"org.iceengine.compare.sources.ICESourceXMLFile" = kryo
"org.iceengine.compare.sources.IceUtilsDifference" = kryo
"org.iceengine.compare.sources.ListWritter" = kryo
"org.iceengine.compare.sources.LogWriter" = kryo
"org.iceengine.compare.sources.SqlWritter" = kryo
"org.iceengine.compare.sources.SSSheet" = kryo
"org.iceengine.compare.sources.SSSheetImpl" = kryo
"org.iceengine.db.ColumnMetaData" = kryo
"org.iceengine.db.DatabaseKeyImpl" = kryo
"org.iceengine.db.DatabaseTab" = kryo
"org.iceengine.db.DatabaseTabInfoDataPer" = kryo
"org.iceengine.db.DatabaseTabPer" = kryo
"org.iceengine.db.EngineConnectionInfo" = kryo
"org.iceengine.db.ICEDB" = kryo
"org.iceengine.db.ICEInfoDataType" = kryo
"org.iceengine.db.QueryBuilder" = kryo
"org.iceengine.utils.ArrayUtil" = kryo
"org.iceengine.utils.BooleanUtil" = kryo
"org.iceengine.utils.ClassUtil" = kryo
"org.iceengine.utils.DataBaseUtilUtil" = kryo
"org.iceengine.utils.EngineXMLUtils" = kryo
"org.iceengine.utils.FileUtil" = kryo
"org.iceengine.utils.ICEDataUtil" = kryo
"org.iceengine.utils.ICEEngineConstants" = kryo
"org.iceengine.utils.ICEUtilsObject" = kryo
"org.iceengine.utils.IceUtilsRes" = kryo
"org.iceengine.utils.ICEUtilsString" = kryo
"org.apache.commons.collections.OrderedMap" = kryo
"org.apache.commons.collections.map.LinkedMap" = kryo
"java.io.File" = kryo
"java.util.HashMap" = kryo
"org.iceengine.compare.engine.ICECompare$CompareType" = kryo
"org.iceengine.common.CompareChain" = kryo
      "org.apache.commons.collections.comparators.ComparatorChain" = kryo
      "[Lorg.apache.commons.collections.comparators.ComparatorChain;" = kryo
      "java.util.ArrayList" = kryo
      "java.util.BitSet" = kryo
      "[Lorg.iceengine.compare.engine.ICEUniqueSource;" = kryo
      "[Lorg.iceengine.common.MapCont;" = kryo
      "org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = kryo
      "java.util.concurrent.atomic.AtomicLong" = kryo
       "java.util.concurrent.ConcurrentHashMap" = kryo
       "java.util.concurrent.atomic.AtomicLongArray" = kryo
       "java.util.concurrent.atomic.AtomicBoolean" = kryo
       "java.text.SimpleDateFormat" = kryo
       "java.util.GregorianCalendar" = kryo
       "java.util.Date" = kryo
       "java.text.DateFormatSymbols" = kryo
       "javax.script.ScriptEngineManager" = kryo
    }
  }
  cluster {
    seed-nodes = [
      "akka://iCEDQApp@192.168.100.199:2551",
      "akka://iCEDQApp@192.168.100.199:2552"]

    #auto-down-unreachable-after = 10s
  }

actor.allow-java-serialization = off
actor.deployment {
        
  "/*/*" {
        
    # Router type provided by metrics extension. 
    #router = cluster-metrics-adaptive-group
    router = round-robin-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    routees.paths = ["/user/expEvaluationBackend"]
    nr-of-instances = 100
    cluster {
      enabled = on
      use-role = backend
      max-nr-of-instances-per-node = 3
      allow-local-routees = off
    }
  }
  
  
 
}
worker-dispatcher {
                  type = Dispatcher
                 executor = "fork-join-executor"
                 fork-join-executor {
                parallelism-min = 2
            parallelism-factor = 2.0
                    parallelism-max = 64
                  }
                  throughput = 5
   }
  # Disable legacy metrics in akka-cluster.
cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
# Enable metrics extension in akka-cluster-metrics.
extensions=[
                        "akka.cluster.metrics.ClusterMetricsExtension",
                        
"com.romix.akka.serialization.kryo.KryoSerializationExtension$"
                        ]

  
}
include "application"

# //#min-nr-of-members
akka.cluster.min-nr-of-members =2
# //#min-nr-of-members

# //#role-min-nr-of-members
akka.cluster.role {
  #frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}
# //#role-min-nr-of-members

# //#adaptive-router
akka.actor.deployment {
  "/*/*" {
    # Router type provided by metrics extension. 
    router = round-robin-pool
   
    dispatcher = worker-dispatcher
    routees.paths = ["/user/expEvaluationBackend"]
    nr-of-instances = 100
    cluster {
      enabled = on
      use-role = backend
      max-nr-of-instances-per-node = 3
      allow-local-routees = off
    }
  }
}
worker-dispatcher {
                  type = Dispatcher
                 executor = "fork-join-executor"
                 fork-join-executor {
                parallelism-min = 2
            parallelism-factor = 2.0
                    parallelism-max = 64
                  }
                  throughput = 5
   }
package org.iceengine.compare.akka;

import java.util.concurrent.Callable;

import org.iceengine.compare.engine.ICEEngineContext;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class ClusterRegisterOnMemberUp implements Callable<ActorRef>{
	
	
	
		private int noOfInstance;
		private long riid;
		private ICEEngineContext context;
		private ActorSystem system;
		private String _ruleType;
		
		public ClusterRegisterOnMemberUp(int noOfInstance, long riid, ICEEngineContext context, ActorSystem system, String _ruleType) {
			super();
			this.noOfInstance = noOfInstance;
			this.riid = riid;
			this.context = context;
			this.system = system;
			this._ruleType = _ruleType;
		}

		@Override
		public ActorRef call() throws Exception{
//			ActorRef master = null;
//			if("report".equalsIgnoreCase(_ruleType))
//				master = system.actorOf(Props.create(MasterForReportRule.class,noOfInstance, riid), "master_"+riid);
//			else if("recon".equalsIgnoreCase(_ruleType))
//				master = system.actorOf(Props.create(MasterForColumnDiffRule.class,noOfInstance, riid), "master_"+riid);
			ActorRef master = system.actorOf(Props.create(Master.class,noOfInstance, riid,_ruleType), "master_"+riid);
			master.tell(context, ActorRef.noSender());
			while(!master.isTerminated()){
				try{
					Thread.sleep(100);
				}catch(Exception e){
					e.printStackTrace();
				}
			}
			return master;
			//_system.actorOf(Props.create(ExpEvaluationFrontend.class, fileName, nrOfInstances,nrOfExpression),"expEvaluationFrontend");
		}
	

}
package org.iceengine.compare.akka;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;

import org.apache.commons.lang.ArrayUtils;
import org.iceengine.compare.engine.CommonRowObj;
import org.iceengine.compare.engine.ICECompare;
import org.iceengine.compare.engine.ICECompareEngine;
import org.iceengine.compare.engine.ICECompareRow;
import org.iceengine.compare.engine.ICEComparison;
import org.iceengine.compare.engine.ICEEngineContext;
import org.iceengine.compare.engine.SRCTRGSide;
import org.iceengine.compare.engine.WritterInterface;

import akka.actor.UntypedActor;

public class DataEvaluatorActor extends UntypedActor {
	//private final Object dummyMaster = new Object();
	private ScriptEngine gEngine;
	DataEvaluatorActor(){
		ScriptEngineManager manager = new ScriptEngineManager();
		gEngine = manager.getEngineByName("groovy");
		System.out.println("111111111111111111111111111111111111111111111");
		
	}

	@Override
	public void onReceive(Object message) {
		//System.out.println("============ onreceive worker ===================");

		if( message instanceof RowData )
		{
			
			RowData rowData = ( RowData )message;
			
			try{
				if("report".equalsIgnoreCase(rowData.getContext()._ruleType)){
					new ICECompareEngine().compareReportRuleExprGroovy(rowData.getSrc_(), rowData.getLhsColumns()
							,rowData.getRhsColumns(),rowData.getReplacedColumnsMap(),rowData.getExpressions()
							,rowData.getExpressions_Groovy(),rowData.getExpressions_Org(),rowData.getContext()
							,rowData.isDisplayRecord(),rowData.isOneSided(),rowData.isJoinFailed()
							,rowData.getRuleType(), rowData.getSrcBindings(),gEngine
							,rowData.getOneSide(), rowData.getErrorInfo());
				}else{
					if(rowData.isOneSided()){
						this.writeRowError(rowData.getSrc_(), rowData.getOneSide(), rowData.getContext(), rowData.getContext()._sink
								, "", rowData.isRecordRow(), ((rowData.getOneSide() == SRCTRGSide.LEFT_INDEX) ? rowData.getLhsErrorInfo() : rowData.getRhsErrorInfo()), rowData.isOneSided());

					}else{
						if(rowData.getComparison() < 0){
							this.writeRowError(rowData.getSrc_(), SRCTRGSide.LEFT_INDEX, rowData.getContext(), rowData.getContext()._sink
									, "", rowData.isRecordRow(),  rowData.getLhsErrorInfo(), rowData.isJoinFailed());
						}else if(rowData.getComparison() > 0){
							this.writeRowError(rowData.getTrg_(), SRCTRGSide.RIGHT_INDEX, rowData.getContext(), rowData.getContext()._sink
									, "", rowData.isRecordRow(),  rowData.getRhsErrorInfo(), rowData.isJoinFailed());
						}else{
							new ICECompareEngine().compareColumnDiffRuleExprGroovy(rowData.getSrc_(), rowData.getTrg_(), rowData.getLhsColumns()
									,rowData.getRhsColumns(),rowData.getReplacedColumnsMap(),rowData.getExpressions()
									,rowData.getExpressions_Groovy(),rowData.getExpressions_Org(),rowData.getContext()
									,rowData.isDisplayRecord(),rowData.isOneSided(),rowData.isJoinFailed()
									,rowData.getRuleType(), rowData.getSrcBindings(),gEngine
									,rowData.getOneSide(), rowData.getLhsErrorInfo(),rowData.getRhsErrorInfo());
						}
					}
				}
			}catch (Exception e) {
				// TODO: handle exception
			}finally{
				//Result result = new Result();
				getSender().tell( rowData.getResult(), getSelf() );
			}
		}else if (message instanceof DataConsumerInspector) {

			DataConsumerInspector status = (DataConsumerInspector) message;

			getSender().tell( status.getResult(), getSelf() );

		}
		else{
			//System.out.println("======================= unhahandled message worker");
			unhandled(message);
		}

	}

	private void writeRowError(Object[] row_, int sideIdx_, ICEEngineContext context_,
			WritterInterface sink_, String exprResult, boolean recordRow, String errorInfo, boolean oneSided) throws IOException {
		ICECompare.CompareType compareType = context_._dataComparison.getCompareType();
		if (compareType == ICECompare.CompareType.COLUMN_DIFF)
			return;
		if (row_ == null)
			return;
		ICEComparison dataComparison = context_.getDataComparison();
		long rowStep = context_.getRowStep();
		SRCTRGSide side = SRCTRGSide.getEnumForConstant(sideIdx_);
		//System.out.println("======> 11111 icecomparerow row::: "+ArrayUtils.toString(row_));
		ICECompareRow rowDiff = new ICECompareRow(rowStep, row_, side, dataComparison, exprResult, recordRow, errorInfo, oneSided );
		sink_.writeLog(rowDiff, context_);
	}

	private void writeCommon(Object[] lrow_, Object[] rrow_, int lsideIdx_, int rsideIdx_, ICEEngineContext context_,
			WritterInterface sink_, String exprResult, boolean recordRow, String lhsErrorInfo, String rhsErrorInfo, boolean recordFinalExpression) throws IOException {
		ICECompare.CompareType compareType = context_._dataComparison.getCompareType();
		if (compareType == ICECompare.CompareType.COLUMN_DIFF)
			return;
		if (lrow_ == null && rrow_ == null)
			return;
		ICEComparison dataComparison = context_.getDataComparison();
		long rowStep = context_.getRowStep();
		SRCTRGSide lside = SRCTRGSide.getEnumForConstant(lsideIdx_);
		SRCTRGSide rside = SRCTRGSide.getEnumForConstant(rsideIdx_);
		CommonRowObj commonRow = new CommonRowObj(rowStep, lrow_, rrow_, lside, rside, dataComparison
				,exprResult, recordRow, lhsErrorInfo, rhsErrorInfo, recordFinalExpression);
		sink_.writeLog(commonRow, context_);
	}

}
package com.ice.test;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.iceengine.compare.akka.DataEvaluatorActor;
import org.iceengine.compare.akka.MetricsListener;
import org.iceengine.compare.conf.ICEEngine;
import org.iceengine.compare.engine.ICEEngineContext;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.FromConfig;
import akka.routing.RoundRobinPool;


public class Main {

	public static void main(String args[]) throws InterruptedException
	{
		System.out.println(" >>>> Cores >>> "+Runtime.getRuntime().availableProcessors());
		//ActorSystem system = ICEEngineContext.instance();
		Main.backendRun(new String[] { "2551","192.168.100.199" });
		Main.backendRun(new String[] { "2552","192.168.100.199" });
		Main.run(27256);
		/*new Thread(new Runnable() {
				@Override
				public void run() {
					Main.run(27256);
				}
			}).start();*/

		/*new Thread(new Runnable() {
				@Override
				public void run() {
					Main.run(27250);
				}
			}).start();

			new Thread(new Runnable() {
				@Override
				public void run() {
					Main.run(27230);
				}
			}).start();*/
	}
	public static void backendRun(String args[])
	{
		final String port = args.length > 0 ? args[0] : "0";
		final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
				//withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.host="+hostname)). 
				withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
				withFallback(ConfigFactory.load("ExpressionEvaluation"));
		//		    final Config config = ConfigFactory.parseString("akka.cluster.roles = [backend]").
		//		           withFallback(ConfigFactory.load("ExpressionEvaluation"));
		ActorSystem system = ActorSystem.create("iCEDQApp", config);
		System.out.println("\t\t nrOfInstances >>>>>>>>> "+FromConfig.getInstance().routerDispatcher());
		//system.actorOf(Props.create(ClientMaster.class), "expEvaluationBackend");
		//system.actorOf(Props.create(EvaluateExpressionActor.class), "expEvaluationBackend");
		// system.actorOf(new RoundRobinPool(8).props(Props.create(EvaluateExpressionActor.class)), "expEvaluationBackend");
	
		system.actorOf(Props.create(DataEvaluatorActor.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(2)), "expEvaluationBackend");
		//system.actorOf(FromConfig.getInstance().props(Props.create(EvaluateExpressionActor.class)),"expEvaluationBackend");
		system.actorOf(Props.create(MetricsListener.class), "metricsListener");

		//		    system.actorOf(
		//		            ClusterSingletonManager.props(
		//		                Master.props(workTimeout),
		//		                PoisonPill.getInstance(),
		//		                ClusterSingletonManagerSettings.create(system).withRole(role)
		//		            ),
		//		            "master");

	}


	
	public static void run(int riid){
		//long riid =27256;

		org.apache.logging.log4j.Logger instanceLog = org.apache.logging.log4j.LogManager.getLogger("ICEApp");
		
//String str ="C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27348_plan.xml";
//		String str ="C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_Validation_107740_107802.plan.xml";
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_plan.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_lhs.dbConnectionInfo.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_rhs.dbConnectionInfo.xml";
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.plan.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.lhs.dbConnectionInfo.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.rhs.dbConnectionInfo.xml";
		
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_Validation_43_Rows.plan.xml"; // 43 Rows Validation rule
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_43_Rows.plan.xml";

		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_BigFile Validation_Big_file.plan.xml"; // 43 Rows Validation rule
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_Big_File_Rule.plan.xml";	
		try
		{
			Map<String, Object> domainSummary =  null;
			domainSummary = ICEEngine.startICEEngine(str, riid, null, instanceLog);

			System.out.println(" BPEL RUNRULE summary - start " );
			//System.out.println(" Problem Code = "+domainSummary.get("PROBLEMCODE"));
			System.out.println(" Problem Msg  = "+domainSummary.get("PROBLEMMSG"));
			System.out.println(" LHS missing count = "+domainSummary.get("LHSM"));
			System.out.println(" RHS missing count = "+domainSummary.get("RHSM"));
			System.out.println(" COMMON rows count = "+domainSummary.get("CMN"));
			System.out.println(" COLUMN DIFF COUNT = " +domainSummary.get("CDC"));
			System.out.println(" Total diff  count = "+domainSummary.get("TOTALCOUNT"));
			System.out.println(" Source Query Value = "+domainSummary.get("SRCQV"));
			System.out.println(" Target Query Value = "+domainSummary.get("TRGQV")); 
			System.out.println(" Scalar Result = "+domainSummary.get("SCALARRESULT"));
			System.out.println(" Final Result = "+domainSummary.get("FRESULT"));
			System.out.println(" Final Result By = "+domainSummary.get("FRESULTBY"));
			System.out.println(("BPEL RUNRULE summary - end "));
		} catch (Exception eee)
		{
			eee.printStackTrace();
		}catch (Error e) {
			e.printStackTrace();
			// TODO: handle exception
		}
	}

}
package org.iceengine.compare.akka;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.ArrayUtils;
import org.iceengine.compare.engine.ICEEngineContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.routing.FromConfig;
import akka.routing.RoundRobinPool;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;


public class Master extends UntypedActor {


	private int numberOfWorkers;
	private long rrid;
	//private ActorRef workerRouter;
	private final ActorRef dataConsumerActor;
    private final ActorRef dataEvaluatorActor;
	private long actorCompletedCounter = 0;
	private long actorSentCounterToDataEvaluatorActor = 0;
	private long start;
	private long end;
	private Result result;
	private boolean dataConsumerInspectorFlag = false;
	private String _ruleType;
	public Master(int nrOfInstances,  long rrid, String _ruleType) 
	{
		this.numberOfWorkers = nrOfInstances;
		this.rrid = rrid;
		System.out.println("NrOfInstances == "+this.numberOfWorkers);
		start = System.nanoTime();
		this.result = new Result();
		this._ruleType = _ruleType;
		if("report".equalsIgnoreCase(_ruleType)){
			dataConsumerActor = this.getContext().actorOf(Props.create(DataConsumerActorForReportRule.class),"report_"+this.rrid);
		}else{
			dataConsumerActor = this.getContext().actorOf(Props.create(DataConsumerActorForColumnDiffRule.class),"recon_"+this.rrid);
		}
		//dataEvaluatorActor = this.getContext().actorOf(Props.create(DataEvaluatorActorForReportRule.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
		//dataEvaluatorActor = this.getContext().actorOf(Props.create(DataEvaluatorActor.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
		dataEvaluatorActor = this.getContext().actorOf(FromConfig.getInstance().props(), "workerRouter_"+this.rrid);
		System.out.println(">>>> dataEvaluatorActor >>> "+this.getContext().children());
		//dummyMaster = new Object();
		//workerRouter = this.getContext().actorOf(Props.create(Worker.class).withRouter(new RoundRobinRouter(this.nrOfInstances)), "workerRouter_"+this.rrid);
		//workerRouter = this.getContext().actorOf(new RoundRobinPool(this.numberOfWorkers).props(Props.create(Worker.class)), "workerRouter_"+this.rrid);
		//workerRouter = getContext().actorOf(Props.create(Worker.class).withDispatcher("defaultDispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
		//workerRouter = this.getContext().actorOf(new RandomPool(this.nrOfInstances).props(Props.create(Worker.class)), "workerRouter");
	}

	/* @Override
    public void preStart() {
    	try{
    		processMessages(this.context_ );
    	}catch(Exception e){

    	}
    	getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
    }*/

	@Override
	public void onReceive(Object message) throws IOException {

		//System.out.println("33333333333333333333333=== "+message);
		if (message instanceof ICEEngineContext) {
			//time.start();
			
			System.out.println("===========> processing context...............");
			ICEEngineContext _context = (ICEEngineContext )message;
			dataConsumerActor.tell(_context, getSelf());
			getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
			
		}else if(message instanceof RowData){
			
			actorSentCounterToDataEvaluatorActor++;
			RowData rowData = (RowData)message;
			//System.out.println("Frontend Data >>>> "+ArrayUtils.toString(rowData.getSrc_()));
			if(actorSentCounterToDataEvaluatorActor%100000==0)
				System.out.print(" Kunal_ICE ");
			rowData.setResult(this.result);
			try{
				dataEvaluatorActor.tell(rowData, getSelf());
			}catch (Error e) {
				e.printStackTrace();
				// TODO: handle exception
			}catch (Exception ee) {
				ee.printStackTrace();
				// TODO: handle exception
			}
		}else if (message instanceof Result) {

			//System.out.println(" >>>>>>>>>>>>> COUNT >>>> "+actorCompletedCounter);
			Result result = ( Result )message;
			//System.out.println(" workerIndex >>>>>>>>>>>> "+result.getWorkerIndex()+" , processDataCount >>> "+result.getNoOfRecordsProcessed());
			if(( ++actorCompletedCounter == actorSentCounterToDataEvaluatorActor) && dataConsumerInspectorFlag )
			{
				System.out.println("name>> "+getSelf().path().name()+" =========== Stop our actor hierarchy ================= actorCompletedCounter >> "+actorCompletedCounter+" , actorSentCounterToDataEvaluatorActor >> "+actorSentCounterToDataEvaluatorActor);
				// Stop our actor hierarchy
				//context_.close();

				getContext().stop( getSelf() );
				
				/*Cluster.get(getContext().system()).registerOnMemberRemoved(new Runnable() {
					@Override
					public void run() {
						// exit JVM when ActorSystem has been terminated
						final Runnable exit = new Runnable() {
							@Override public void run() {
								System.exit(0);
							}
						};
						getContext().system().registerOnTermination(exit);

						// shut down ActorSystem
						getContext().system().terminate();

						// In case ActorSystem shutdown takes longer than 10 seconds,
						// exit the JVM forcefully anyway.
						// We must spawn a separate thread to not block current thread,
						// since that would have blocked the shutdown of the ActorSystem.
						new Thread() {
							@Override public void run(){
								try {
									Await.ready(getContext().system().whenTerminated(), Duration.create(5, TimeUnit.SECONDS));
								} catch (Exception e) {
									System.exit(-1);
								}

							}
						}.start();
					}
				});*/
				
				
				//getContext().stop(evaluateExpressionActor);
				//getContext().system().shutdown();
				System.out.println(" Done");
				end = System.nanoTime();
				System.out.println(" Total time >>> "+TimeUnit.SECONDS.convert(TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS),TimeUnit.MILLISECONDS));
			}
			//actorCompletedCounter++;

		}else if (message instanceof DataConsumerInspector) {
			
			dataConsumerInspectorFlag = true;
			DataConsumerInspector status = (DataConsumerInspector) message;
			this.result.setDataConsumerInspectorFlag(status.isDataConsumerInspectorFlag());
			status.setResult(result);
			//dataEvaluatorActor.tell(status, getSelf());
			System.out.println(" >>>>>>>>>>>>>>>> result : "+this.result.isDataConsumerInspectorFlag()+" , actorSentCounterToDataEvaluatorActor >> "+actorSentCounterToDataEvaluatorActor+" , actorCompletedCounter >> "+actorCompletedCounter);
			
		} else {
			//System.out.println("===============> Final else unhandled message");
			unhandled(message);
		}
	}
	

}

Reply via email to