I don't think the bottleneck is in Artery/Aeron. We have measured very good throughput such as
- 100 bytes message: 689,655 msg/s, 68,965,517 bytes/s - 10,0000 bytes message: 12,392 msg/s, 123,924,183 bytes/s On Wed, May 17, 2017 at 1:50 PM, Kunal Ghosh <[email protected]> wrote: > Thanks I really appreciate your help !!! > When I run my application for 43 rows (2kb) input file it creates 1.14 gb > of file in c:\Users\admin\AppData\Local\Temp. When i open up that file in > editor apart from other things I see data in it. I think it is due to this > writing of data in file the performance is getting hit ( when i am > processing big file). Is there any way in aeron configure application so > that it won't create that temporary file??? > > On Wednesday, May 17, 2017 at 4:52:12 PM UTC+5:30, Patrik Nordwall wrote: >> >> When you use the embedded media driver (that is the default) the files >> should be deleted when the actor system is terminated, but not if the >> process is killed abruptly. >> >> On Wed, May 17, 2017 at 12:42 PM, Kunal Ghosh <[email protected]> >> wrote: >> >>> I just had to delete aprox 300 GB of aeron-temp files in >>> c:\Users\admin\AppData\Local\Temp. >>> how to clean up Aeron files ? >>> >>> On Wednesday, May 17, 2017 at 12:46:54 PM UTC+5:30, Patrik Nordwall >>> wrote: >>>> >>>> Performance debugging/tuning is not something I can help with in free >>>> OSS support. We would be able to do that in Lightbend's commercial support. >>>> >>>> Regards, >>>> Patrik >>>> >>>> On Wed, May 17, 2017 at 6:51 AM, Kunal Ghosh <[email protected]> >>>> wrote: >>>> >>>>> Is it because I am running application on single physical machine , >>>>> the application is taking more time the process? >>>>> >>>>> >>>>> On Tuesday, May 16, 2017 at 9:22:28 AM UTC+5:30, Kunal Ghosh wrote: >>>>>> >>>>>> Hey Patrik, Thanks for the help !! >>>>>> your solution worked ! >>>>>> Now when I run my application in non clustered environment with round >>>>>> robin pool (no of instances = 8) it take 23 seconds to process 2 million >>>>>> rows of data. >>>>>> But when I run same application in clustered environment it took 23 >>>>>> minutes !!! The master node reads data from input file and sends it over >>>>>> to >>>>>> both *seed-nodes **(each node have no of instances = 4)* for >>>>>> evaluation (processing). I can not figure out the reason behind this. >>>>>> *Machine Information :* >>>>>> RAM : 16 GB >>>>>> Logical Process : 8 >>>>>> Cores : 4 >>>>>> CPU : Intel(R) Core(TM) i7-4700MQ CPU @ 2.40GHz >>>>>> >>>>>> Can you Help ??? >>>>>> *I have also attached the configuration file and code examples are >>>>>> attached with this mail please do check them out.* >>>>>> >>>>>> *Application configuration file (without cluster)* >>>>>> >>>>>> iCEDQDispatcher{ >>>>>> >>>>>> worker-dispatcher { >>>>>> type = Dispatcher >>>>>> executor = "fork-join-executor" >>>>>> fork-join-executor { >>>>>> parallelism-min = 2 >>>>>> parallelism-factor = 2.0 >>>>>> parallelism-max = 64 >>>>>> } >>>>>> throughput = 5 >>>>>> } >>>>>> >>>>>> CallingThreadDispatcher { >>>>>> type = akka.testkit.CallingThreadDispatcherConfigurator >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> *Application configuration file (with cluster)* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = >>>>>> "INFO" stdout-loglevel = "INFO" logging-filter = >>>>>> "akka.event.slf4j.Slf4jLoggingFilter" actor.provider = >>>>>> "akka.cluster.ClusterActorRefProvider" remote { >>>>>> log-remote-lifecycle-events = on log-sent-messages = on >>>>>> log-received-messages = on artery { enabled = on >>>>>> canonical.hostname = "192.168.100.199" canonical.port = 25520 } >>>>>> }actor { kryo { type = "graph" idstrategy = "incremental" >>>>>> #"explicit" buffer-size = 4096 max-buffer-size = -1 >>>>>> use-manifest = false implicit-registration-logging = true >>>>>> kryo-trace = false mappings { "[Ljava.lang.String;" = 20 >>>>>> "[Lorg.iceengine.compare.engine.ICEColSource;" = 21 >>>>>> "org.iceengine.compare.engine.ICEColSource$Type" = 22 >>>>>> "java.text.DecimalFormat" = 23 "java.math.RoundingMode" = 24 >>>>>> "java.text.DecimalFormatSymbols" = 25 "java.util.Locale" = 26 >>>>>> "[Ljava.lang.Integer;" = 27 "[Ljava.lang.Object;" = 28 >>>>>> "[Lorg.iceengine.compare.engine.ICECompareColumnData;" = 29 >>>>>> "[Lorg.iceengine.common.CompareChain;" = 30 >>>>>> "[Lorg.apache.commons.collections.comparators.ComparatorChain;" = 31 >>>>>> "org.iceengine.compare.engine.ICEEngineContext" = 32 >>>>>> "org.iceengine.compare.akka.Ro >>>>>> <http://org.iceengine.compare.akka.Ro>wData" >>>>>> = 33 "org.iceengine.compare.akka.DataConsumerInspector" = 34 >>>>>> "org.iceengine.compare.akka.Re >>>>>> <http://org.iceengine.compare.akka.Re>sult" >>>>>> = 35 "org.iceengine.common.pairs.IceEnginePairs" = 36 >>>>>> "org.iceengine.common.pairs.IcePairKey" = 37 >>>>>> "org.iceengine.common.pairs.IceEnginePairsImpl" = 38 >>>>>> "org.iceengine.common.DataComparator" = 39 >>>>>> "org.iceengine.common.pairs.IcePairValue" = 40 >>>>>> "org.iceengine.common.MapCont" = 41 >>>>>> "org.iceengine.common.ObjectComparator" >>>>>> = 42 "org.iceengine.compare.akka.At >>>>>> <http://org.iceengine.compare.akka.At>omicObject" = 43 >>>>>> "org.iceengine.compare.akka.Si >>>>>> <http://org.iceengine.compare.akka.Si>mpleBindings" = 44 >>>>>> "org.iceengine.compare.akka.Work >>>>>> <http://org.iceengine.compare.akka.Work>" >>>>>> = 45 "org.iceengine.compare.comparator.DKChainDiffor" = 46 >>>>>> "org.iceengine.compare.comparator.DKConvertingDiffor" = 47 >>>>>> "org.iceengine.compare.comparator.DKDateDiffor" = 48 >>>>>> "org.iceengine.compare.comparator.DKEqualsDiffor" = 49 >>>>>> "org.iceengine.compare.comparator.DKIdentityDiffor" = 50 >>>>>> "org.iceengine.compare.comparator.DKNumberDiffor" = 51 >>>>>> "org.iceengine.compare.comparator.DKTextDiffor" = 52 >>>>>> "org.iceengine.compare.conf.De >>>>>> <http://org.iceengine.compare.conf.De>faultDataComparison" = 53 >>>>>> "org.iceengine.compare.conf.ICEBuildRule" = 54 >>>>>> "org.iceengine.compare.conf.ICEDepRule" = 55 >>>>>> "org.iceengine.compare.conf.ICEEngine" = 56 >>>>>> "org.iceengine.compare.conf.ICEExpression" = 57 >>>>>> "org.iceengine.compare.conf.ICERule" = 58 >>>>>> "org.iceengine.compare.conf.ICERulePT" = 59 >>>>>> "org.iceengine.compare.conf.ICERuleRuleContainer" = 60 >>>>>> "org.iceengine.compare.engine.CommonRowObj" = 61 >>>>>> "org.iceengine.compare.engine.DataComparison" = 62 >>>>>> "org.iceengine.compare.engine.ICEColSource" = 63 >>>>>> "org.iceengine.compare.engine.ICECompareColInRow" = 64 >>>>>> "org.iceengine.compare.engine.ICECompareColumn" = 65 >>>>>> "org.iceengine.compare.engine.ICECompareColumnData" = 66 >>>>>> "org.iceengine.compare.engine.ICECompareEngine" = 67 >>>>>> "org.iceengine.compare.engine.ICECompareRow" = 68 >>>>>> "org.iceengine.compare.engine.ICEExprCompare" = 69 >>>>>> "org.iceengine.compare.engine.ICEFinalExpressionDiff" = 70 >>>>>> "org.iceengine.compare.engine.ICESourceUtil" = 71 >>>>>> "org.iceengine.compare.engine.ICEUniqueSource" = 72 >>>>>> "org.iceengine.compare.sources.EngineWritterFormat" = 73 >>>>>> "org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = 74 >>>>>> "org.iceengine.compare.sources.FileSort" = 75 >>>>>> "org.iceengine.compare.sources.FinalWritter" = 76 >>>>>> "org.iceengine.compare.sources.ICEDBWritter" = 77 >>>>>> "org.iceengine.compare.sources.IceFileWritter" = 78 >>>>>> "org.iceengine.compare.sources.ICESourceDataBase" = 79 >>>>>> "org.iceengine.compare.sources.IceSourceFixedFile" = 80 >>>>>> "org.iceengine.compare.sources.ICESourceFlatFile" = 81 >>>>>> "org.iceengine.compare.sources.ICESourceList" = 82 >>>>>> "org.iceengine.compare.sources.ICESourceSS" = 83 >>>>>> "org.iceengine.compare.sources.ICESourceXMLFile" = 84 >>>>>> "org.iceengine.compare.sources.IceUtilsDifference" = 85 >>>>>> "org.iceengine.compare.sources.ListWritter" = 86 >>>>>> "org.iceengine.compare.sources.LogWriter" = 87 >>>>>> "org.iceengine.compare.sources.SqlWritter" = 88 >>>>>> "org.iceengine.compare.sources.SSSheet" = 89 >>>>>> "org.iceengine.compare.sources.SSSheetImpl" = 90 >>>>>> "org.iceengine.db.ColumnMetaData" = 91 >>>>>> "org.iceengine.db.DatabaseKeyImpl" = >>>>>> 92 "org.iceengine.db.DatabaseTab" = 93 >>>>>> "org.iceengine.db.DatabaseTabInfoDataPer" = 94 >>>>>> "org.iceengine.db.DatabaseTabPer" = 95 >>>>>> "org.iceengine.db.EngineConnectionInfo" = 96 "org.iceengine.db.ICEDB" = >>>>>> 97 >>>>>> "org.iceengine.db.ICEInfoDataType" = 98 "org.iceengine.db.QueryBuilder" = >>>>>> 99 "org.iceengine.utils.ArrayUtil" = 100 >>>>>> "org.iceengine.utils.BooleanUtil" >>>>>> = 101 "org.iceengine.utils.ClassUtil" = 102 >>>>>> "org.iceengine.utils.DataBaseUtilUtil" = 103 >>>>>> "org.iceengine.utils.EngineXMLUtils" = 104 >>>>>> "org.iceengine.utils.FileUtil" = >>>>>> 105 "org.iceengine.utils.ICEDataUtil" = 106 >>>>>> "org.iceengine.utils.ICEEngineConstants" = 107 >>>>>> "org.iceengine.utils.ICEUtilsObject" = 108 >>>>>> "org.iceengine.utils.IceUtilsRes" = 109 >>>>>> "org.iceengine.utils.ICEUtilsString" = 110 >>>>>> "org.iceengine.utils.SpringUtils" = 111 >>>>>> "org.apache.commons.collections.OrderedMap" = 112 >>>>>> "org.apache.commons.collections.map.LinkedMap" = 113 "java.io.File" = 114 >>>>>> "java.util.HashMap" = 115 >>>>>> "org.iceengine.compare.engine.ICECompare$CompareType" = 116 >>>>>> "org.iceengine.common.CompareChain" = 117 >>>>>> "org.apache.commons.collections.comparators.ComparatorChain" = 118 >>>>>> "java.util.ArrayList" = 119 "java.util.BitSet" = 120 >>>>>> "[Lorg.iceengine.compare.engine.ICEUniqueSource;" = 121 >>>>>> "[Lorg.iceengine.common.MapCont;" = 122 >>>>>> "org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = 123 >>>>>> "java.util.concurrent.atomic.AtomicLong" = 124 >>>>>> "java.util.concurrent.ConcurrentHashMap" = 125 >>>>>> "java.util.concurrent.atomic.AtomicLongArray" = 126 >>>>>> "java.util.concurrent.atomic.AtomicBoolean" = 127 >>>>>> "java.text.SimpleDateFormat" = 128 "java.util.GregorianCalendar" = >>>>>> 129 "java.util.Date" = 130 "java.text.DateFormatSymbols" = 131 >>>>>> "javax.script.ScriptEngineManager" = 132 } } >>>>>> serialize-messages = on serializers { #java = >>>>>> "akka.serialization.JavaSerializer" kryo = >>>>>> "com.romix.akka.serialization.kryo.KryoSerializer" } >>>>>> serialization-bindings { "[Ljava.lang.String;" = kryo >>>>>> "[Lorg.iceengine.compare.engine.ICEColSource;" = kryo >>>>>> "org.iceengine.compare.engine.ICEColSource$Type" = kryo >>>>>> "java.text.DecimalFormat" = kryo "java.math.RoundingMode" = kryo >>>>>> "java.text.DecimalFormatSymbols" = kryo "java.util.Locale" = kryo >>>>>> "[Ljava.lang.Integer;" = kryo "[Ljava.lang.Object;" = kryo >>>>>> "[Lorg.iceengine.compare.engine.ICECompareColumnData;" = kryo >>>>>> "[Ljava.util.Comparator;" = kryo >>>>>> "[Lorg.iceengine.common.CompareChain;" = kryo >>>>>> "org.iceengine.compare.engine.ICEEngineContext" = kryo >>>>>> "org.iceengine.compare.akka.Ro >>>>>> <http://org.iceengine.compare.akka.Ro>wData" >>>>>> = kryo "org.iceengine.compare.akka.DataConsumerInspector" = kryo >>>>>> "org.iceengine.compare.akka.Re >>>>>> <http://org.iceengine.compare.akka.Re>sult" >>>>>> = kryo "org.iceengine.common.pairs.IceEnginePairs" = >>>>>> kryo"org.iceengine.common.pairs.IcePairKey" = >>>>>> kryo"org.iceengine.common.pairs.IceEnginePairsImpl" = >>>>>> kryo"org.iceengine.common.DataComparator" = >>>>>> kryo"org.iceengine.common.pairs.IcePairValue" = >>>>>> kryo"org.iceengine.common.MapCont" = >>>>>> kryo"org.iceengine.common.ObjectComparator" = >>>>>> kryo"org.iceengine.compare.akka.At >>>>>> <http://org.iceengine.compare.akka.At>omicObject" = >>>>>> kryo"org.iceengine.compare.akka.Si >>>>>> <http://org.iceengine.compare.akka.Si>mpleBindings" = >>>>>> kryo"org.iceengine.compare.akka.Work >>>>>> <http://org.iceengine.compare.akka.Work>" = >>>>>> kryo"org.iceengine.compare.comparator.DKChainDiffor" = >>>>>> kryo"org.iceengine.compare.comparator.DKConvertingDiffor" = >>>>>> kryo"org.iceengine.compare.comparator.DKDateDiffor" = >>>>>> kryo"org.iceengine.compare.comparator.DKEqualsDiffor" = >>>>>> kryo"org.iceengine.compare.comparator.DKIdentityDiffor" = >>>>>> kryo"org.iceengine.compare.comparator.DKNumberDiffor" = >>>>>> kryo"org.iceengine.compare.comparator.DKTextDiffor" = >>>>>> kryo"org.iceengine.compare.conf.De >>>>>> <http://org.iceengine.compare.conf.De>faultDataComparison" = >>>>>> kryo"org.iceengine.compare.conf.ICEBuildRule" = >>>>>> kryo"org.iceengine.compare.conf.ICEDepRule" = >>>>>> kryo"org.iceengine.compare.conf.ICEEngine" = >>>>>> kryo"org.iceengine.compare.conf.ICEExpression" = >>>>>> kryo"org.iceengine.compare.conf.ICERule" = >>>>>> kryo"org.iceengine.compare.conf.ICERulePT" = >>>>>> kryo"org.iceengine.compare.conf.ICERuleRuleContainer" = >>>>>> kryo"org.iceengine.compare.engine.CommonRowObj" = >>>>>> kryo"org.iceengine.compare.engine.DataComparison" = >>>>>> kryo"org.iceengine.compare.engine.ICEColSource" = >>>>>> kryo"org.iceengine.compare.engine.ICECompareColInRow" = >>>>>> kryo"org.iceengine.compare.engine.ICECompareColumn" = >>>>>> kryo"org.iceengine.compare.engine.ICECompareColumnData" = >>>>>> kryo"org.iceengine.compare.engine.ICECompareEngine" = >>>>>> kryo"org.iceengine.compare.engine.ICECompareRow" = >>>>>> kryo"org.iceengine.compare.engine.ICEExprCompare" = >>>>>> kryo"org.iceengine.compare.engine.ICEFinalExpressionDiff" = >>>>>> kryo"org.iceengine.compare.engine.ICESourceUtil" = >>>>>> kryo"org.iceengine.compare.engine.ICEUniqueSource" = >>>>>> kryo"org.iceengine.compare.sources.EngineWritterFormat" = >>>>>> kryo"org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = >>>>>> kryo"org.iceengine.compare.sources.FileSort" = >>>>>> kryo"org.iceengine.compare.sources.FinalWritter" = >>>>>> kryo"org.iceengine.compare.sources.ICEDBWritter" = >>>>>> kryo"org.iceengine.compare.sources.IceFileWritter" = >>>>>> kryo"org.iceengine.compare.sources.ICESourceDataBase" = >>>>>> kryo"org.iceengine.compare.sources.IceSourceFixedFile" = >>>>>> kryo"org.iceengine.compare.sources.ICESourceFlatFile" = >>>>>> kryo"org.iceengine.compare.sources.ICESourceList" = >>>>>> kryo"org.iceengine.compare.sources.ICESourceSS" = >>>>>> kryo"org.iceengine.compare.sources.ICESourceXMLFile" = >>>>>> kryo"org.iceengine.compare.sources.IceUtilsDifference" = >>>>>> kryo"org.iceengine.compare.sources.ListWritter" = >>>>>> kryo"org.iceengine.compare.sources.LogWriter" = >>>>>> kryo"org.iceengine.compare.sources.SqlWritter" = >>>>>> kryo"org.iceengine.compare.sources.SSSheet" = >>>>>> kryo"org.iceengine.compare.sources.SSSheetImpl" = >>>>>> kryo"org.iceengine.db.ColumnMetaData" = >>>>>> kryo"org.iceengine.db.DatabaseKeyImpl" = >>>>>> kryo"org.iceengine.db.DatabaseTab" >>>>>> = kryo"org.iceengine.db.DatabaseTabInfoDataPer" = >>>>>> kryo"org.iceengine.db.DatabaseTabPer" = >>>>>> kryo"org.iceengine.db.EngineConnectionInfo" = >>>>>> kryo"org.iceengine.db.ICEDB" >>>>>> = kryo"org.iceengine.db.ICEInfoDataType" = >>>>>> kryo"org.iceengine.db.QueryBuilder" = >>>>>> kryo"org.iceengine.utils.ArrayUtil" = >>>>>> kryo"org.iceengine.utils.BooleanUtil" = >>>>>> kryo"org.iceengine.utils.ClassUtil" >>>>>> = kryo"org.iceengine.utils.DataBaseUtilUtil" = >>>>>> kryo"org.iceengine.utils.EngineXMLUtils" = >>>>>> kryo"org.iceengine.utils.FileUtil" = >>>>>> kryo"org.iceengine.utils.ICEDataUtil" >>>>>> = kryo"org.iceengine.utils.ICEEngineConstants" = >>>>>> kryo"org.iceengine.utils.ICEUtilsObject" = >>>>>> kryo"org.iceengine.utils.IceUtilsRes" = >>>>>> kryo"org.iceengine.utils.ICEUtilsString" = >>>>>> kryo"org.apache.commons.collections.OrderedMap" = >>>>>> kryo"org.apache.commons.collections.map.LinkedMap" = kryo"java.io.File" = >>>>>> kryo"java.util.HashMap" = >>>>>> kryo"org.iceengine.compare.engine.ICECompare$CompareType" = >>>>>> kryo"org.iceengine.common.CompareChain" = kryo >>>>>> "org.apache.commons.collections.comparators.ComparatorChain" = kryo >>>>>> "[Lorg.apache.commons.collections.comparators.ComparatorChain;" = kryo >>>>>> "java.util.ArrayList" = kryo "java.util.BitSet" = kryo >>>>>> "[Lorg.iceengine.compare.engine.ICEUniqueSource;" = kryo >>>>>> "[Lorg.iceengine.common.MapCont;" = kryo >>>>>> "org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = kryo >>>>>> "java.util.concurrent.atomic.AtomicLong" = kryo >>>>>> "java.util.concurrent.ConcurrentHashMap" = kryo >>>>>> "java.util.concurrent.atomic.AtomicLongArray" = kryo >>>>>> "java.util.concurrent.atomic.AtomicBoolean" = kryo >>>>>> "java.text.SimpleDateFormat" = kryo "java.util.GregorianCalendar" >>>>>> = >>>>>> kryo "java.util.Date" = kryo "java.text.DateFormatSymbols" = >>>>>> kryo "javax.script.ScriptEngineManager" = kryo } } cluster { >>>>>> seed-nodes = [ "akka://[email protected]:2551 >>>>>> <http://[email protected]:2551>", >>>>>> "akka://[email protected]:2552 >>>>>> <http://[email protected]:2552>"] #auto-down-unreachable-after >>>>>> = >>>>>> 10s }actor.allow-java-serialization = offactor.deployment { "/*/*" { >>>>>> # Router type provided by metrics extension. #router = >>>>>> cluster-metrics-adaptive-group router = round-robin-group # Router >>>>>> parameter specific for metrics extension. # metrics-selector = heap >>>>>> # >>>>>> metrics-selector = load # metrics-selector = cpu metrics-selector = >>>>>> mix # routees.paths = ["/user/expEvaluationBackend"] >>>>>> nr-of-instances = 100 cluster { enabled = on use-role = >>>>>> backend max-nr-of-instances-per-node = 3 allow-local-routees = >>>>>> off } } }worker-dispatcher { type = Dispatcher executor = >>>>>> "fork-join-executor" fork-join-executor { parallelism-min = 2 >>>>>> parallelism-factor = 2.0 parallelism-max = 64 } throughput = 5 >>>>>> } # Disable legacy metrics in >>>>>> akka-cluster.cluster.metrics.enabled=offextensions=[ >>>>>> "akka.cluster.metrics.ClusterMetricsExtension", >>>>>> "com.romix.akka.serialization.kryo.KryoSerializationExtension$" ] }* >>>>>> On Monday, May 8, 2017 at 6:37:17 PM UTC+5:30, Patrik Nordwall wrote: >>>>>>> >>>>>>> the port number for the seed-nodes does not match canonical.port = >>>>>>> 25520 >>>>>>> >>>>>>> replacing akka.tcp with akka is correct, and if you have that in the >>>>>>> code somewhere it must be changed there also >>>>>>> >>>>>>> On Mon, May 8, 2017 at 2:36 PM, Kunal Ghosh <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks @Patrik Your help is much appreciated !! >>>>>>>> >>>>>>>> Below are my configuration for Kryo Serialization and Artery remote >>>>>>>> implementation in application.conf file. Please go through it and tell >>>>>>>> me >>>>>>>> whether is it correct ?? >>>>>>>> Also I have a question that changing configuration is enough or I >>>>>>>> will have to make changes in the code as well? >>>>>>>> >>>>>>>> *application.conf* >>>>>>>> >>>>>>>> *akka {* >>>>>>>> >>>>>>>> * loggers = ["akka.event.slf4j.Slf4jLogger"]* >>>>>>>> * loglevel = "DEBUG"* >>>>>>>> >>>>>>>> * stdout-loglevel = "DEBUG"* >>>>>>>> >>>>>>>> * logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"* >>>>>>>> >>>>>>>> * actor.provider = "akka.cluster.ClusterActorRefProvider"* >>>>>>>> >>>>>>>> *# Artery remoting implementation* >>>>>>>> >>>>>>>> *remote {* >>>>>>>> * log-remote-lifecycle-events = on* >>>>>>>> * log-sent-messages = on* >>>>>>>> * log-received-messages = on* >>>>>>>> * artery {* >>>>>>>> * enabled = on* >>>>>>>> * canonical.hostname = "192.168.100.199"* >>>>>>>> * canonical.port = 25520* >>>>>>>> * }* >>>>>>>> * }* >>>>>>>> >>>>>>>> *# **KryoSerializer Configuration* >>>>>>>> >>>>>>>> * actor {* >>>>>>>> * kryo {* >>>>>>>> * type = "graph"* >>>>>>>> * idstrategy = "incremental"* >>>>>>>> * buffer-size = 4096* >>>>>>>> * max-buffer-size = -1* >>>>>>>> * use-manifest = false* >>>>>>>> * implicit-registration-logging = true* >>>>>>>> * kryo-trace = true* >>>>>>>> >>>>>>>> * mappings {* >>>>>>>> * "org.iceengine.compare.engine.ICEEngineContext" = 32* >>>>>>>> * "org.iceengine.compare.akka.Ro >>>>>>>> <http://org.iceengine.compare.akka.Ro>wData" = 33* >>>>>>>> * "org.iceengine.compare.akka.DataConsumerInspector" = 34* >>>>>>>> * "org.iceengine.compare.akka.Re >>>>>>>> <http://org.iceengine.compare.akka.Re>sult" = 35* >>>>>>>> * }* >>>>>>>> * }* >>>>>>>> >>>>>>>> * serialize-messages = on* >>>>>>>> * serializers {* >>>>>>>> * #java = "akka.serialization.JavaSerializer"* >>>>>>>> * kryo = "com.romix.akka.serialization.kryo.KryoSerializer"* >>>>>>>> * }* >>>>>>>> * serialization-bindings {* >>>>>>>> * "org.iceengine.compare.engine.ICEEngineContext" = kryo* >>>>>>>> * "org.iceengine.compare.akka.Ro >>>>>>>> <http://org.iceengine.compare.akka.Ro>wData" = kryo* >>>>>>>> * "org.iceengine.compare.akka.DataConsumerInspector" = kryo* >>>>>>>> * "org.iceengine.compare.akka.Re >>>>>>>> <http://org.iceengine.compare.akka.Re>sult" = kryo* >>>>>>>> * }* >>>>>>>> * }* >>>>>>>> >>>>>>>> * cluster {* >>>>>>>> * seed-nodes = [* >>>>>>>> * "akka://[email protected]:2551 >>>>>>>> <http://[email protected]:2551>",* >>>>>>>> * "akka://[email protected]:2552 >>>>>>>> <http://[email protected]:2552>"]* >>>>>>>> >>>>>>>> * #auto-down-unreachable-after = 10s* >>>>>>>> * }* >>>>>>>> >>>>>>>> *akka.cluster.min-nr-of-members =3* >>>>>>>> *# //#min-nr-of-members* >>>>>>>> >>>>>>>> *# //#role-min-nr-of-members* >>>>>>>> *akka.cluster.role {* >>>>>>>> * frontend.min-nr-of-members = 1* >>>>>>>> * backend.min-nr-of-members = 2* >>>>>>>> *}* >>>>>>>> *actor.allow-java-serialization = off* >>>>>>>> *actor.deployment {* >>>>>>>> >>>>>>>> * "/*/*" {* >>>>>>>> >>>>>>>> * # Router type provided by metrics extension. * >>>>>>>> * #router = cluster-metrics-adaptive-group* >>>>>>>> * router = round-robin-group* >>>>>>>> * # Router parameter specific for metrics extension.* >>>>>>>> * # metrics-selector = heap* >>>>>>>> * # metrics-selector = load* >>>>>>>> * # metrics-selector = cpu* >>>>>>>> * metrics-selector = mix* >>>>>>>> * #* >>>>>>>> * routees.paths = ["/user/expEvaluationBackend"]* >>>>>>>> * nr-of-instances = 100* >>>>>>>> * cluster {* >>>>>>>> * enabled = on* >>>>>>>> * use-role = backend* >>>>>>>> * max-nr-of-instances-per-node = 3* >>>>>>>> * allow-local-routees = off* >>>>>>>> * }* >>>>>>>> * }* >>>>>>>> >>>>>>>> *}* >>>>>>>> * # Disable legacy metrics in akka-cluster.* >>>>>>>> *cluster.metrics.enabled=off* >>>>>>>> >>>>>>>> *# Enable metrics extension in akka-cluster-metrics.* >>>>>>>> *extensions=[* >>>>>>>> * "akka.cluster.metrics.ClusterMetricsExtension",* >>>>>>>> * "com.romix.akka.serialization.kryo.KryoSerializationExtension$"* >>>>>>>> * ]* >>>>>>>> >>>>>>>> >>>>>>>> *}* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Saturday, May 6, 2017 at 5:56:36 PM UTC+5:30, Patrik Nordwall >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> First, don't use java serialization for performance and security >>>>>>>>> reasons. Secondly, actor messages should be small (a few 100kB at >>>>>>>>> most). >>>>>>>>> Otherwise they will prevent other messages to get through, such as >>>>>>>>> cluster >>>>>>>>> heartbeat messages. Split the large message into smaller messages, or >>>>>>>>> transfer it on a side channel such as Akka Http or Stream TCP. I'd >>>>>>>>> also >>>>>>>>> recommend that you try the new remoting implementatio, see Artery in >>>>>>>>> docs. >>>>>>>>> >>>>>>>>> /Patrik >>>>>>>>> fre 5 maj 2017 kl. 16:44 skrev Kunal Ghosh <[email protected]>: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> my application uses a Akka cluster which has one master node and >>>>>>>>>> two child seed nodes. The master node reads data from input file and >>>>>>>>>> sends >>>>>>>>>> it over to both child nodes for evaluation (processing). >>>>>>>>>> The application works fine for smaller data file eg. file with 43 >>>>>>>>>> rows but when the input file is hug like with 2 million rows the >>>>>>>>>> application fails. The exception thrown with stack trace is given >>>>>>>>>> below. >>>>>>>>>> I have also attached the configuration file and code examples are >>>>>>>>>> attached with this mail please do check them out and tell where I am >>>>>>>>>> wrong >>>>>>>>>> ???? >>>>>>>>>> Thanks in advance. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> WARN [18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Sl >>>>>>>>>> f4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using >>>>>>>>>> the default Java serializer for class [ >>>>>>>>>> org.iceengine.compare.akka.RowData] which is not recommended >>>>>>>>>> because of performance implications. Use another serializer or >>>>>>>>>> disable this >>>>>>>>>> warning using the setting 'akka.actor.warn-about-java-se >>>>>>>>>> rializer-usage' >>>>>>>>>> WARN [18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Sl >>>>>>>>>> f4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using >>>>>>>>>> the default Java serializer for class [ >>>>>>>>>> org.iceengine.compare.akka.Result] which is not recommended >>>>>>>>>> because of performance implications. Use another serializer or >>>>>>>>>> disable this >>>>>>>>>> warning using the setting 'akka.actor.warn-about-java-se >>>>>>>>>> rializer-usage' >>>>>>>>>> WARN [18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf >>>>>>>>>> 4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using >>>>>>>>>> the default Java serializer for class [ >>>>>>>>>> org.iceengine.compare.akka.Result] which is not recommended >>>>>>>>>> because of performance implications. Use another serializer or >>>>>>>>>> disable this >>>>>>>>>> warning using the setting 'akka.actor.warn-about-java-se >>>>>>>>>> rializer-usage' >>>>>>>>>> WARN [18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf >>>>>>>>>> 4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster >>>>>>>>>> Node [akka.tcp://[email protected]:2551] - Marking >>>>>>>>>> node(s) as UNREACHABLE [Member(address = akka.tcp:// >>>>>>>>>> [email protected]:62915, status = Up)]. Node roles >>>>>>>>>> [backend] >>>>>>>>>> WARN [18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Sl >>>>>>>>>> f4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster >>>>>>>>>> Node [akka.tcp://[email protected]:62915] - Marking >>>>>>>>>> node(s) as UNREACHABLE [Member(address = akka.tcp:// >>>>>>>>>> [email protected]:2551, status = Up)]. Node roles [] >>>>>>>>>> Kunal_ICE ERROR[18:48:23.473]{iCEDQApp-a >>>>>>>>>> kka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receiv >>>>>>>>>> e$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError >>>>>>>>>> [akka.tcp://[email protected]:2552] <- [akka.tcp:// >>>>>>>>>> [email protected]:62915]: Error [null] [ >>>>>>>>>> java.io.OptionalDataException >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1373) >>>>>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java: >>>>>>>>>> 373) >>>>>>>>>> at java.util.HashMap.readObject(HashMap.java:1402) >>>>>>>>>> at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) >>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>>>>>> thodAccessorImpl.java:43) >>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass >>>>>>>>>> .java:1058) >>>>>>>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>>>>>>>> ava:1909) >>>>>>>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>>>>>>>> am.java:1808) >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1353) >>>>>>>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>>>>>>>> m.java:2018) >>>>>>>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>>>>>>>> ava:1942) >>>>>>>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>>>>>>>> am.java:1808) >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1353) >>>>>>>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>>>>>>>> m.java:2018) >>>>>>>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>>>>>>>> ava:1942) >>>>>>>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>>>>>>>> am.java:1808) >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1353) >>>>>>>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>>>>>>>> m.java:2018) >>>>>>>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>>>>>>>> ava:1942) >>>>>>>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>>>>>>>> am.java:1808) >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1353) >>>>>>>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>>>>>>>> m.java:2018) >>>>>>>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>>>>>>>> ava:1942) >>>>>>>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>>>>>>>> am.java:1808) >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1353) >>>>>>>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>>>>>>>> m.java:2018) >>>>>>>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>>>>>>>> ava:1942) >>>>>>>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>>>>>>>> am.java:1808) >>>>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java >>>>>>>>>> :1353) >>>>>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java: >>>>>>>>>> 373) >>>>>>>>>> at akka.serialization.JavaSerializer$$anonfun$1.apply(Serialize >>>>>>>>>> r.scala:304) >>>>>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:5 >>>>>>>>>> 8) >>>>>>>>>> at akka.serialization.JavaSerializer.fromBinary(Serializer.scal >>>>>>>>>> a:304) >>>>>>>>>> at akka.serialization.Serialization.akka$serialization$Serializ >>>>>>>>>> ation$$deserializeByteArray(Serialization.scala:151) >>>>>>>>>> at akka.serialization.Serialization$$anonfun$deserialize$2.appl >>>>>>>>>> y(Serialization.scala:137) >>>>>>>>>> at scala.util.Try$.apply(Try.scala:192) >>>>>>>>>> at akka.serialization.Serialization.deserialize(Serialization.s >>>>>>>>>> cala:131) >>>>>>>>>> at akka.remote.serialization.MessageContainerSerializer.fromBin >>>>>>>>>> ary(MessageContainerSerializer.scala:80) >>>>>>>>>> at akka.serialization.Serialization.akka$serialization$Serializ >>>>>>>>>> ation$$deserializeByteArray(Serialization.scala:151) >>>>>>>>>> at akka.serialization.Serialization$$anonfun$deserialize$2.appl >>>>>>>>>> y(Serialization.scala:137) >>>>>>>>>> at scala.util.Try$.apply(Try.scala:192) >>>>>>>>>> at akka.serialization.Serialization.deserialize(Serialization.s >>>>>>>>>> cala:131) >>>>>>>>>> at akka.remote.MessageSerializer$.deserialize(MessageSerializer >>>>>>>>>> .scala:30) >>>>>>>>>> at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(En >>>>>>>>>> dpoint.scala:64) >>>>>>>>>> at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scal >>>>>>>>>> a:64) >>>>>>>>>> at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala >>>>>>>>>> :69) >>>>>>>>>> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala >>>>>>>>>> :81) >>>>>>>>>> at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(En >>>>>>>>>> dpoint.scala:988) >>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:496) >>>>>>>>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452) >>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j >>>>>>>>>> ava:260) >>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For >>>>>>>>>> kJoinPool.java:1339) >>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >>>>>>>>>> l.java:1979) >>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >>>>>>>>>> orkerThread.java:107) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *Front End Class* >>>>>>>>>> >>>>>>>>>> ======================= >>>>>>>>>> ActorSystem system = ActorSystem.create("iCEDQApp", >>>>>>>>>> ConfigFactory.load()); >>>>>>>>>> >>>>>>>>>> System.out.println("IceCompareEngine ============ >>>>>> >>>>>>>>>> "+context_._ruleType); >>>>>>>>>> ClusterRegisterOnMemberUp registerUp = new >>>>>>>>>> ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_ >>>>>>>>>> ,system,context_._ruleType); >>>>>>>>>> FutureTask<ActorRef> futureTask = new >>>>>>>>>> FutureTask<ActorRef>(registerUp); >>>>>>>>>> >>>>>>>>>> // ExecutorService executor = Executors.newFixedThreadPool(1); >>>>>>>>>> // executor.execute(futureTask); >>>>>>>>>> Cluster.get(system).registerOnMemberUp(futureTask); >>>>>>>>>> while (true){ >>>>>>>>>> try{ >>>>>>>>>> if(futureTask.isDone()){ >>>>>>>>>> System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> >>>>>>>>>> "); >>>>>>>>>> break; >>>>>>>>>> } >>>>>>>>>> }catch (Exception e) { >>>>>>>>>> // TODO: handle exception >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c >>>>>>>>>> urrent/additional/faq.html >>>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou >>>>>>>>>> p/akka-user >>>>>>>>>> --- >>>>>>>>>> You received this message because you are subscribed to the >>>>>>>>>> Google Groups "Akka User List" group. >>>>>>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>>>>>> send an email to [email protected]. >>>>>>>>>> To post to this group, send email to [email protected]. >>>>>>>>>> Visit this group at https://groups.google.com/group/akka-user. >>>>>>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>>>>>> >>>>>>>>> -- >>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c >>>>>>>> urrent/additional/faq.html >>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou >>>>>>>> p/akka-user >>>>>>>> --- >>>>>>>> You received this message because you are subscribed to the Google >>>>>>>> Groups "Akka User List" group. >>>>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>>>> send an email to [email protected]. >>>>>>>> To post to this group, send email to [email protected]. >>>>>>>> Visit this group at https://groups.google.com/group/akka-user. >>>>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Patrik Nordwall >>>>>>> Akka Tech Lead >>>>>>> Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM >>>>>>> Twitter: @patriknw >>>>>>> >>>>>>> -- >>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c >>>>> urrent/additional/faq.html >>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou >>>>> p/akka-user >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Akka User List" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> To post to this group, send email to [email protected]. >>>>> Visit this group at https://groups.google.com/group/akka-user. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Patrik Nordwall >>>> Akka Tech Lead >>>> Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM >>>> Twitter: @patriknw >>>> >>>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c >>> urrent/additional/faq.html >>> >>>>>>>>>> Search the archives: https://groups.google.com/grou >>> p/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at https://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> >> -- >> >> Patrik Nordwall >> Akka Tech Lead >> Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM >> Twitter: @patriknw >> >> -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c > urrent/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Patrik Nordwall Akka Tech Lead Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM Twitter: @patriknw -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
