Hi,
my application uses a Akka cluster which has one master node and two child 
seed nodes. The master node reads data from input file and sends it over to 
both child nodes for evaluation (processing).
The application works fine for smaller data file eg. file with 43 rows but 
when the input file is hug like with 2 million rows the application fails. 
The exception thrown with stack trace is given below.
I have also attached the configuration file and code examples are attached 
with this mail please do check them out and tell where I am wrong ????
Thanks in advance.




WARN 
[18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
 
the default Java serializer for class [org.iceengine.compare.akka.RowData] 
which is not recommended because of performance implications. Use another 
serializer or disable this warning using the setting 
'akka.actor.warn-about-java-serializer-usage'
WARN 
[18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
 
the default Java serializer for class [org.iceengine.compare.akka.Result] 
which is not recommended because of performance implications. Use another 
serializer or disable this warning using the setting 
'akka.actor.warn-about-java-serializer-usage'
WARN 
[18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
 
the default Java serializer for class [org.iceengine.compare.akka.Result] 
which is not recommended because of performance implications. Use another 
serializer or disable this warning using the setting 
'akka.actor.warn-about-java-serializer-usage'
WARN 
[18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
 
Node [akka.tcp://iCEDQApp@192.168.100.199:2551] - Marking node(s) as 
UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:62915, 
status = Up)]. Node roles [backend]
WARN 
[18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
 
Node [akka.tcp://iCEDQApp@192.168.100.199:62915] - Marking node(s) as 
UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:2551, 
status = Up)]. Node roles []
 Kunal_ICE 
ERROR[18:48:23.473]{iCEDQApp-akka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError
 
[akka.tcp://iCEDQApp@192.168.100.199:2552] <- 
[akka.tcp://iCEDQApp@192.168.100.199:62915]: Error [null] [
java.io.OptionalDataException
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
 at java.util.HashMap.readObject(HashMap.java:1402)
 at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
 at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:304)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:304)
 at 
akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
 at 
akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
 at scala.util.Try$.apply(Try.scala:192)
 at akka.serialization.Serialization.deserialize(Serialization.scala:131)
 at 
akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:80)
 at 
akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
 at 
akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
 at scala.util.Try$.apply(Try.scala:192)
 at akka.serialization.Serialization.deserialize(Serialization.scala:131)
 at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
 at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
 at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
 at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala:69)
 at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:81)
 at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:988)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
 at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


*Front End Class*

=======================
ActorSystem system = ActorSystem.create("iCEDQApp",ConfigFactory.load());
   
   System.out.println("IceCompareEngine ============ >>>>>> 
"+context_._ruleType);
   ClusterRegisterOnMemberUp registerUp = new 
ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_,system,context_._ruleType);
   FutureTask<ActorRef> futureTask = new FutureTask<ActorRef>(registerUp);

//   ExecutorService executor = Executors.newFixedThreadPool(1);
//   executor.execute(futureTask);
   Cluster.get(system).registerOnMemberUp(futureTask);
   while (true){
    try{
     if(futureTask.isDone()){
      System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> ");
      break;
     }
    }catch (Exception e) {
     // TODO: handle exception
    }
   }

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
include "application"

# //#min-nr-of-members
akka.cluster.min-nr-of-members =2
# //#min-nr-of-members

# //#role-min-nr-of-members
akka.cluster.role {
  #frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}
# //#role-min-nr-of-members

# //#adaptive-router
akka.actor.deployment {
  "/*/*" {
    # Router type provided by metrics extension. 
    router = round-robin-pool
   
    dispatcher = worker-dispatcher
    routees.paths = ["/user/expEvaluationBackend"]
    nr-of-instances = 100
    cluster {
      enabled = on
      use-role = backend
      max-nr-of-instances-per-node = 3
      allow-local-routees = off
    }
  }
}
worker-dispatcher {
                  type = Dispatcher
                 executor = "fork-join-executor"
                 fork-join-executor {
                parallelism-min = 2
            parallelism-factor = 2.0
                    parallelism-max = 64
                  }
                  throughput = 5
   }
akka {

# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]
 
  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"
 
  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "ERROR"
 
  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  
  actor.provider = "akka.cluster.ClusterActorRefProvider"

remote {
    log-remote-lifecycle-events = on
    log-sent-messages = on
    log-received-messages = on
    netty.tcp {
      hostname = "192.168.100.199"
      port = 0
    }
    
  }
  cluster {
    seed-nodes = [
      "akka.tcp://iCEDQApp@192.168.100.199:2551",
      "akka.tcp://iCEDQApp@192.168.100.199:2552"]

    #auto-down-unreachable-after = 10s
  }

akka.cluster.min-nr-of-members =3
# //#min-nr-of-members

# //#role-min-nr-of-members
akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}
actor.deployment {
        
  "/*/*" {
        
    # Router type provided by metrics extension. 
    #router = cluster-metrics-adaptive-group
    router = round-robin-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    routees.paths = ["/user/expEvaluationBackend"]
    nr-of-instances = 100
    cluster {
      enabled = on
      use-role = backend
      max-nr-of-instances-per-node = 3
      allow-local-routees = off
    }
  }
 
}
  # Disable legacy metrics in akka-cluster.
cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

  
}
package org.iceengine.compare.akka;

import java.util.concurrent.Callable;

import org.iceengine.compare.engine.ICEEngineContext;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class ClusterRegisterOnMemberUp implements Callable<ActorRef>{
	
	
	
		private int noOfInstance;
		private long riid;
		private ICEEngineContext context;
		private ActorSystem system;
		private String _ruleType;
		
		public ClusterRegisterOnMemberUp(int noOfInstance, long riid, ICEEngineContext context, ActorSystem system, String _ruleType) {
			super();
			this.noOfInstance = noOfInstance;
			this.riid = riid;
			this.context = context;
			this.system = system;
			this._ruleType = _ruleType;
		}

		@Override
		public ActorRef call() throws Exception{
//			ActorRef master = null;
//			if("report".equalsIgnoreCase(_ruleType))
//				master = system.actorOf(Props.create(MasterForReportRule.class,noOfInstance, riid), "master_"+riid);
//			else if("recon".equalsIgnoreCase(_ruleType))
//				master = system.actorOf(Props.create(MasterForColumnDiffRule.class,noOfInstance, riid), "master_"+riid);
			ActorRef master = system.actorOf(Props.create(Master.class,noOfInstance, riid,_ruleType), "master_"+riid);
			master.tell(context, ActorRef.noSender());
			while(!master.isTerminated()){
				try{
					Thread.sleep(100);
				}catch(Exception e){
					e.printStackTrace();
				}
			}
			return master;
			//_system.actorOf(Props.create(ExpEvaluationFrontend.class, fileName, nrOfInstances,nrOfExpression),"expEvaluationFrontend");
		}
	

}
package org.iceengine.compare.akka;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;

import org.apache.commons.lang.ArrayUtils;
import org.iceengine.compare.engine.CommonRowObj;
import org.iceengine.compare.engine.ICECompare;
import org.iceengine.compare.engine.ICECompareEngine;
import org.iceengine.compare.engine.ICECompareRow;
import org.iceengine.compare.engine.ICEComparison;
import org.iceengine.compare.engine.ICEEngineContext;
import org.iceengine.compare.engine.SRCTRGSide;
import org.iceengine.compare.engine.WritterInterface;

import akka.actor.UntypedActor;

public class DataEvaluatorActor extends UntypedActor {
	//private final Object dummyMaster = new Object();
	private ScriptEngine gEngine;
	DataEvaluatorActor(){
		ScriptEngineManager manager = new ScriptEngineManager();
		gEngine = manager.getEngineByName("groovy");
		System.out.println("111111111111111111111111111111111111111111111");
		
	}

	@Override
	public void onReceive(Object message) {
		//System.out.println("============ onreceive worker ===================");

		if( message instanceof RowData )
		{
			
			RowData rowData = ( RowData )message;
			
			try{
				if("report".equalsIgnoreCase(rowData.getContext()._ruleType)){
					new ICECompareEngine().compareReportRuleExprGroovy(rowData.getSrc_(), rowData.getLhsColumns()
							,rowData.getRhsColumns(),rowData.getReplacedColumnsMap(),rowData.getExpressions()
							,rowData.getExpressions_Groovy(),rowData.getExpressions_Org(),rowData.getContext()
							,rowData.isDisplayRecord(),rowData.isOneSided(),rowData.isJoinFailed()
							,rowData.getRuleType(), rowData.getSrcBindings(),gEngine
							,rowData.getOneSide(), rowData.getErrorInfo());
				}else{
					if(rowData.isOneSided()){
						this.writeRowError(rowData.getSrc_(), rowData.getOneSide(), rowData.getContext(), rowData.getContext()._sink
								, "", rowData.isRecordRow(), ((rowData.getOneSide() == SRCTRGSide.LEFT_INDEX) ? rowData.getLhsErrorInfo() : rowData.getRhsErrorInfo()), rowData.isOneSided());

					}else{
						if(rowData.getComparison() < 0){
							this.writeRowError(rowData.getSrc_(), SRCTRGSide.LEFT_INDEX, rowData.getContext(), rowData.getContext()._sink
									, "", rowData.isRecordRow(),  rowData.getLhsErrorInfo(), rowData.isJoinFailed());
						}else if(rowData.getComparison() > 0){
							this.writeRowError(rowData.getTrg_(), SRCTRGSide.RIGHT_INDEX, rowData.getContext(), rowData.getContext()._sink
									, "", rowData.isRecordRow(),  rowData.getRhsErrorInfo(), rowData.isJoinFailed());
						}else{
							new ICECompareEngine().compareColumnDiffRuleExprGroovy(rowData.getSrc_(), rowData.getTrg_(), rowData.getLhsColumns()
									,rowData.getRhsColumns(),rowData.getReplacedColumnsMap(),rowData.getExpressions()
									,rowData.getExpressions_Groovy(),rowData.getExpressions_Org(),rowData.getContext()
									,rowData.isDisplayRecord(),rowData.isOneSided(),rowData.isJoinFailed()
									,rowData.getRuleType(), rowData.getSrcBindings(),gEngine
									,rowData.getOneSide(), rowData.getLhsErrorInfo(),rowData.getRhsErrorInfo());
						}
					}
				}
			}catch (Exception e) {
				// TODO: handle exception
			}finally{
				//Result result = new Result();
				getSender().tell( rowData.getResult(), getSelf() );
			}
		}else if (message instanceof DataConsumerInspector) {

			DataConsumerInspector status = (DataConsumerInspector) message;

			getSender().tell( status.getResult(), getSelf() );

		}
		else{
			//System.out.println("======================= unhahandled message worker");
			unhandled(message);
		}

	}

	private void writeRowError(Object[] row_, int sideIdx_, ICEEngineContext context_,
			WritterInterface sink_, String exprResult, boolean recordRow, String errorInfo, boolean oneSided) throws IOException {
		ICECompare.CompareType compareType = context_._dataComparison.getCompareType();
		if (compareType == ICECompare.CompareType.COLUMN_DIFF)
			return;
		if (row_ == null)
			return;
		ICEComparison dataComparison = context_.getDataComparison();
		long rowStep = context_.getRowStep();
		SRCTRGSide side = SRCTRGSide.getEnumForConstant(sideIdx_);
		//System.out.println("======> 11111 icecomparerow row::: "+ArrayUtils.toString(row_));
		ICECompareRow rowDiff = new ICECompareRow(rowStep, row_, side, dataComparison, exprResult, recordRow, errorInfo, oneSided );
		sink_.writeLog(rowDiff, context_);
	}

	private void writeCommon(Object[] lrow_, Object[] rrow_, int lsideIdx_, int rsideIdx_, ICEEngineContext context_,
			WritterInterface sink_, String exprResult, boolean recordRow, String lhsErrorInfo, String rhsErrorInfo, boolean recordFinalExpression) throws IOException {
		ICECompare.CompareType compareType = context_._dataComparison.getCompareType();
		if (compareType == ICECompare.CompareType.COLUMN_DIFF)
			return;
		if (lrow_ == null && rrow_ == null)
			return;
		ICEComparison dataComparison = context_.getDataComparison();
		long rowStep = context_.getRowStep();
		SRCTRGSide lside = SRCTRGSide.getEnumForConstant(lsideIdx_);
		SRCTRGSide rside = SRCTRGSide.getEnumForConstant(rsideIdx_);
		CommonRowObj commonRow = new CommonRowObj(rowStep, lrow_, rrow_, lside, rside, dataComparison
				,exprResult, recordRow, lhsErrorInfo, rhsErrorInfo, recordFinalExpression);
		sink_.writeLog(commonRow, context_);
	}

}
package com.ice.test;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.iceengine.compare.akka.DataEvaluatorActor;
import org.iceengine.compare.akka.MetricsListener;
import org.iceengine.compare.conf.ICEEngine;
import org.iceengine.compare.engine.ICEEngineContext;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.FromConfig;
import akka.routing.RoundRobinPool;


public class Main {

	public static void main(String args[]) throws InterruptedException
	{
		System.out.println(" >>>> Cores >>> "+Runtime.getRuntime().availableProcessors());
		//ActorSystem system = ICEEngineContext.instance();
		Main.backendRun(new String[] { "2551","192.168.100.199" });
		Main.backendRun(new String[] { "2552","192.168.100.199" });
		Main.run(27256);
		/*new Thread(new Runnable() {
				@Override
				public void run() {
					Main.run(27256);
				}
			}).start();*/

		/*new Thread(new Runnable() {
				@Override
				public void run() {
					Main.run(27250);
				}
			}).start();

			new Thread(new Runnable() {
				@Override
				public void run() {
					Main.run(27230);
				}
			}).start();*/
	}
	public static void backendRun(String args[])
	{
		final String port = args.length > 0 ? args[0] : "0";
		final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
				//withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.host="+hostname)). 
				withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
				withFallback(ConfigFactory.load("ExpressionEvaluation"));
		//		    final Config config = ConfigFactory.parseString("akka.cluster.roles = [backend]").
		//		           withFallback(ConfigFactory.load("ExpressionEvaluation"));
		ActorSystem system = ActorSystem.create("iCEDQApp", config);
		System.out.println("\t\t nrOfInstances >>>>>>>>> "+FromConfig.getInstance().routerDispatcher());
		//system.actorOf(Props.create(ClientMaster.class), "expEvaluationBackend");
		//system.actorOf(Props.create(EvaluateExpressionActor.class), "expEvaluationBackend");
		// system.actorOf(new RoundRobinPool(8).props(Props.create(EvaluateExpressionActor.class)), "expEvaluationBackend");
	
		system.actorOf(Props.create(DataEvaluatorActor.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(2)), "expEvaluationBackend");
		//system.actorOf(FromConfig.getInstance().props(Props.create(EvaluateExpressionActor.class)),"expEvaluationBackend");
		system.actorOf(Props.create(MetricsListener.class), "metricsListener");

		//		    system.actorOf(
		//		            ClusterSingletonManager.props(
		//		                Master.props(workTimeout),
		//		                PoisonPill.getInstance(),
		//		                ClusterSingletonManagerSettings.create(system).withRole(role)
		//		            ),
		//		            "master");

	}


	
	public static void run(int riid){
		//long riid =27256;

		org.apache.logging.log4j.Logger instanceLog = org.apache.logging.log4j.LogManager.getLogger("ICEApp");
		
//String str ="C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27348_plan.xml";
//		String str ="C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_Validation_107740_107802.plan.xml";
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_plan.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_lhs.dbConnectionInfo.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_rhs.dbConnectionInfo.xml";
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.plan.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.lhs.dbConnectionInfo.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.rhs.dbConnectionInfo.xml";
		
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_Validation_43_Rows.plan.xml"; // 43 Rows Validation rule
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_43_Rows.plan.xml";

		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_BigFile Validation_Big_file.plan.xml"; // 43 Rows Validation rule
//		String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_Big_File_Rule.plan.xml";	
		try
		{
			Map<String, Object> domainSummary =  null;
			domainSummary = ICEEngine.startICEEngine(str, riid, null, instanceLog);

			System.out.println(" BPEL RUNRULE summary - start " );
			//System.out.println(" Problem Code = "+domainSummary.get("PROBLEMCODE"));
			System.out.println(" Problem Msg  = "+domainSummary.get("PROBLEMMSG"));
			System.out.println(" LHS missing count = "+domainSummary.get("LHSM"));
			System.out.println(" RHS missing count = "+domainSummary.get("RHSM"));
			System.out.println(" COMMON rows count = "+domainSummary.get("CMN"));
			System.out.println(" COLUMN DIFF COUNT = " +domainSummary.get("CDC"));
			System.out.println(" Total diff  count = "+domainSummary.get("TOTALCOUNT"));
			System.out.println(" Source Query Value = "+domainSummary.get("SRCQV"));
			System.out.println(" Target Query Value = "+domainSummary.get("TRGQV")); 
			System.out.println(" Scalar Result = "+domainSummary.get("SCALARRESULT"));
			System.out.println(" Final Result = "+domainSummary.get("FRESULT"));
			System.out.println(" Final Result By = "+domainSummary.get("FRESULTBY"));
			System.out.println(("BPEL RUNRULE summary - end "));
		} catch (Exception eee)
		{
			eee.printStackTrace();
		}catch (Error e) {
			e.printStackTrace();
			// TODO: handle exception
		}
	}

}
package org.iceengine.compare.akka;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.ArrayUtils;
import org.iceengine.compare.engine.ICEEngineContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.routing.FromConfig;
import akka.routing.RoundRobinPool;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;


public class Master extends UntypedActor {


	private int numberOfWorkers;
	private long rrid;
	//private ActorRef workerRouter;
	private final ActorRef dataConsumerActor;
    private final ActorRef dataEvaluatorActor;
	private long actorCompletedCounter = 0;
	private long actorSentCounterToDataEvaluatorActor = 0;
	private long start;
	private long end;
	private Result result;
	private boolean dataConsumerInspectorFlag = false;
	private String _ruleType;
	public Master(int nrOfInstances,  long rrid, String _ruleType) 
	{
		this.numberOfWorkers = nrOfInstances;
		this.rrid = rrid;
		System.out.println("NrOfInstances == "+this.numberOfWorkers);
		start = System.nanoTime();
		this.result = new Result();
		this._ruleType = _ruleType;
		if("report".equalsIgnoreCase(_ruleType)){
			dataConsumerActor = this.getContext().actorOf(Props.create(DataConsumerActorForReportRule.class),"report_"+this.rrid);
		}else{
			dataConsumerActor = this.getContext().actorOf(Props.create(DataConsumerActorForColumnDiffRule.class),"recon_"+this.rrid);
		}
		//dataEvaluatorActor = this.getContext().actorOf(Props.create(DataEvaluatorActorForReportRule.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
		//dataEvaluatorActor = this.getContext().actorOf(Props.create(DataEvaluatorActor.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
		dataEvaluatorActor = this.getContext().actorOf(FromConfig.getInstance().props(), "workerRouter_"+this.rrid);
		System.out.println(">>>> dataEvaluatorActor >>> "+this.getContext().children());
		//dummyMaster = new Object();
		//workerRouter = this.getContext().actorOf(Props.create(Worker.class).withRouter(new RoundRobinRouter(this.nrOfInstances)), "workerRouter_"+this.rrid);
		//workerRouter = this.getContext().actorOf(new RoundRobinPool(this.numberOfWorkers).props(Props.create(Worker.class)), "workerRouter_"+this.rrid);
		//workerRouter = getContext().actorOf(Props.create(Worker.class).withDispatcher("defaultDispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
		//workerRouter = this.getContext().actorOf(new RandomPool(this.nrOfInstances).props(Props.create(Worker.class)), "workerRouter");
	}

	/* @Override
    public void preStart() {
    	try{
    		processMessages(this.context_ );
    	}catch(Exception e){

    	}
    	getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
    }*/

	@Override
	public void onReceive(Object message) throws IOException {

		//System.out.println("33333333333333333333333=== "+message);
		if (message instanceof ICEEngineContext) {
			//time.start();
			
			System.out.println("===========> processing context...............");
			ICEEngineContext _context = (ICEEngineContext )message;
			dataConsumerActor.tell(_context, getSelf());
			getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
			
		}else if(message instanceof RowData){
			
			actorSentCounterToDataEvaluatorActor++;
			RowData rowData = (RowData)message;
			//System.out.println("Frontend Data >>>> "+ArrayUtils.toString(rowData.getSrc_()));
			if(actorSentCounterToDataEvaluatorActor%100000==0)
				System.out.print(" Kunal_ICE ");
			rowData.setResult(this.result);
			try{
				dataEvaluatorActor.tell(rowData, getSelf());
			}catch (Error e) {
				e.printStackTrace();
				// TODO: handle exception
			}catch (Exception ee) {
				ee.printStackTrace();
				// TODO: handle exception
			}
		}else if (message instanceof Result) {

			//System.out.println(" >>>>>>>>>>>>> COUNT >>>> "+actorCompletedCounter);
			Result result = ( Result )message;
			//System.out.println(" workerIndex >>>>>>>>>>>> "+result.getWorkerIndex()+" , processDataCount >>> "+result.getNoOfRecordsProcessed());
			if(( ++actorCompletedCounter == actorSentCounterToDataEvaluatorActor) && dataConsumerInspectorFlag )
			{
				System.out.println("name>> "+getSelf().path().name()+" =========== Stop our actor hierarchy ================= actorCompletedCounter >> "+actorCompletedCounter+" , actorSentCounterToDataEvaluatorActor >> "+actorSentCounterToDataEvaluatorActor);
				// Stop our actor hierarchy
				//context_.close();

				getContext().stop( getSelf() );
				
				/*Cluster.get(getContext().system()).registerOnMemberRemoved(new Runnable() {
					@Override
					public void run() {
						// exit JVM when ActorSystem has been terminated
						final Runnable exit = new Runnable() {
							@Override public void run() {
								System.exit(0);
							}
						};
						getContext().system().registerOnTermination(exit);

						// shut down ActorSystem
						getContext().system().terminate();

						// In case ActorSystem shutdown takes longer than 10 seconds,
						// exit the JVM forcefully anyway.
						// We must spawn a separate thread to not block current thread,
						// since that would have blocked the shutdown of the ActorSystem.
						new Thread() {
							@Override public void run(){
								try {
									Await.ready(getContext().system().whenTerminated(), Duration.create(5, TimeUnit.SECONDS));
								} catch (Exception e) {
									System.exit(-1);
								}

							}
						}.start();
					}
				});*/
				
				
				//getContext().stop(evaluateExpressionActor);
				//getContext().system().shutdown();
				System.out.println(" Done");
				end = System.nanoTime();
				System.out.println(" Total time >>> "+TimeUnit.SECONDS.convert(TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS),TimeUnit.MILLISECONDS));
			}
			//actorCompletedCounter++;

		}else if (message instanceof DataConsumerInspector) {
			
			dataConsumerInspectorFlag = true;
			DataConsumerInspector status = (DataConsumerInspector) message;
			this.result.setDataConsumerInspectorFlag(status.isDataConsumerInspectorFlag());
			status.setResult(result);
			//dataEvaluatorActor.tell(status, getSelf());
			System.out.println(" >>>>>>>>>>>>>>>> result : "+this.result.isDataConsumerInspectorFlag()+" , actorSentCounterToDataEvaluatorActor >> "+actorSentCounterToDataEvaluatorActor+" , actorCompletedCounter >> "+actorCompletedCounter);
			
		} else {
			//System.out.println("===============> Final else unhandled message");
			unhandled(message);
		}
	}
	

}

Reply via email to