Hi,
my application uses a Akka cluster which has one master node and two child
seed nodes. The master node reads data from input file and sends it over to
both child nodes for evaluation (processing).
The application works fine for smaller data file eg. file with 43 rows but
when the input file is hug like with 2 million rows the application fails.
The exception thrown with stack trace is given below.
I have also attached the configuration file and code examples are attached
with this mail please do check them out and tell where I am wrong ????
Thanks in advance.
WARN
[18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
the default Java serializer for class [org.iceengine.compare.akka.RowData]
which is not recommended because of performance implications. Use another
serializer or disable this warning using the setting
'akka.actor.warn-about-java-serializer-usage'
WARN
[18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
the default Java serializer for class [org.iceengine.compare.akka.Result]
which is not recommended because of performance implications. Use another
serializer or disable this warning using the setting
'akka.actor.warn-about-java-serializer-usage'
WARN
[18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using
the default Java serializer for class [org.iceengine.compare.akka.Result]
which is not recommended because of performance implications. Use another
serializer or disable this warning using the setting
'akka.actor.warn-about-java-serializer-usage'
WARN
[18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
Node [akka.tcp://iCEDQApp@192.168.100.199:2551] - Marking node(s) as
UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:62915,
status = Up)]. Node roles [backend]
WARN
[18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster
Node [akka.tcp://iCEDQApp@192.168.100.199:62915] - Marking node(s) as
UNREACHABLE [Member(address = akka.tcp://iCEDQApp@192.168.100.199:2551,
status = Up)]. Node roles []
Kunal_ICE
ERROR[18:48:23.473]{iCEDQApp-akka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError
[akka.tcp://iCEDQApp@192.168.100.199:2552] <-
[akka.tcp://iCEDQApp@192.168.100.199:62915]: Error [null] [
java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at java.util.HashMap.readObject(HashMap.java:1402)
at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:304)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:304)
at
akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
at
akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:131)
at
akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:80)
at
akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
at
akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:131)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
at
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala:69)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:81)
at
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:988)
at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
*Front End Class*
=======================
ActorSystem system = ActorSystem.create("iCEDQApp",ConfigFactory.load());
System.out.println("IceCompareEngine ============ >>>>>>
"+context_._ruleType);
ClusterRegisterOnMemberUp registerUp = new
ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_,system,context_._ruleType);
FutureTask<ActorRef> futureTask = new FutureTask<ActorRef>(registerUp);
// ExecutorService executor = Executors.newFixedThreadPool(1);
// executor.execute(futureTask);
Cluster.get(system).registerOnMemberUp(futureTask);
while (true){
try{
if(futureTask.isDone()){
System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> ");
break;
}
}catch (Exception e) {
// TODO: handle exception
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
include "application"
# //#min-nr-of-members
akka.cluster.min-nr-of-members =2
# //#min-nr-of-members
# //#role-min-nr-of-members
akka.cluster.role {
#frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
# //#role-min-nr-of-members
# //#adaptive-router
akka.actor.deployment {
"/*/*" {
# Router type provided by metrics extension.
router = round-robin-pool
dispatcher = worker-dispatcher
routees.paths = ["/user/expEvaluationBackend"]
nr-of-instances = 100
cluster {
enabled = on
use-role = backend
max-nr-of-instances-per-node = 3
allow-local-routees = off
}
}
}
worker-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 64
}
throughput = 5
}
akka {
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
# to STDOUT)
loggers = ["akka.event.slf4j.Slf4jLogger"]
# Log level used by the configured loggers (see "loggers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "DEBUG"
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "ERROR"
# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream.
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor.provider = "akka.cluster.ClusterActorRefProvider"
remote {
log-remote-lifecycle-events = on
log-sent-messages = on
log-received-messages = on
netty.tcp {
hostname = "192.168.100.199"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://iCEDQApp@192.168.100.199:2551",
"akka.tcp://iCEDQApp@192.168.100.199:2552"]
#auto-down-unreachable-after = 10s
}
akka.cluster.min-nr-of-members =3
# //#min-nr-of-members
# //#role-min-nr-of-members
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
actor.deployment {
"/*/*" {
# Router type provided by metrics extension.
#router = cluster-metrics-adaptive-group
router = round-robin-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
routees.paths = ["/user/expEvaluationBackend"]
nr-of-instances = 100
cluster {
enabled = on
use-role = backend
max-nr-of-instances-per-node = 3
allow-local-routees = off
}
}
}
# Disable legacy metrics in akka-cluster.
cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics.
extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
}
package org.iceengine.compare.akka;
import java.util.concurrent.Callable;
import org.iceengine.compare.engine.ICEEngineContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class ClusterRegisterOnMemberUp implements Callable<ActorRef>{
private int noOfInstance;
private long riid;
private ICEEngineContext context;
private ActorSystem system;
private String _ruleType;
public ClusterRegisterOnMemberUp(int noOfInstance, long riid, ICEEngineContext context, ActorSystem system, String _ruleType) {
super();
this.noOfInstance = noOfInstance;
this.riid = riid;
this.context = context;
this.system = system;
this._ruleType = _ruleType;
}
@Override
public ActorRef call() throws Exception{
// ActorRef master = null;
// if("report".equalsIgnoreCase(_ruleType))
// master = system.actorOf(Props.create(MasterForReportRule.class,noOfInstance, riid), "master_"+riid);
// else if("recon".equalsIgnoreCase(_ruleType))
// master = system.actorOf(Props.create(MasterForColumnDiffRule.class,noOfInstance, riid), "master_"+riid);
ActorRef master = system.actorOf(Props.create(Master.class,noOfInstance, riid,_ruleType), "master_"+riid);
master.tell(context, ActorRef.noSender());
while(!master.isTerminated()){
try{
Thread.sleep(100);
}catch(Exception e){
e.printStackTrace();
}
}
return master;
//_system.actorOf(Props.create(ExpEvaluationFrontend.class, fileName, nrOfInstances,nrOfExpression),"expEvaluationFrontend");
}
}
package org.iceengine.compare.akka;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import org.apache.commons.lang.ArrayUtils;
import org.iceengine.compare.engine.CommonRowObj;
import org.iceengine.compare.engine.ICECompare;
import org.iceengine.compare.engine.ICECompareEngine;
import org.iceengine.compare.engine.ICECompareRow;
import org.iceengine.compare.engine.ICEComparison;
import org.iceengine.compare.engine.ICEEngineContext;
import org.iceengine.compare.engine.SRCTRGSide;
import org.iceengine.compare.engine.WritterInterface;
import akka.actor.UntypedActor;
public class DataEvaluatorActor extends UntypedActor {
//private final Object dummyMaster = new Object();
private ScriptEngine gEngine;
DataEvaluatorActor(){
ScriptEngineManager manager = new ScriptEngineManager();
gEngine = manager.getEngineByName("groovy");
System.out.println("111111111111111111111111111111111111111111111");
}
@Override
public void onReceive(Object message) {
//System.out.println("============ onreceive worker ===================");
if( message instanceof RowData )
{
RowData rowData = ( RowData )message;
try{
if("report".equalsIgnoreCase(rowData.getContext()._ruleType)){
new ICECompareEngine().compareReportRuleExprGroovy(rowData.getSrc_(), rowData.getLhsColumns()
,rowData.getRhsColumns(),rowData.getReplacedColumnsMap(),rowData.getExpressions()
,rowData.getExpressions_Groovy(),rowData.getExpressions_Org(),rowData.getContext()
,rowData.isDisplayRecord(),rowData.isOneSided(),rowData.isJoinFailed()
,rowData.getRuleType(), rowData.getSrcBindings(),gEngine
,rowData.getOneSide(), rowData.getErrorInfo());
}else{
if(rowData.isOneSided()){
this.writeRowError(rowData.getSrc_(), rowData.getOneSide(), rowData.getContext(), rowData.getContext()._sink
, "", rowData.isRecordRow(), ((rowData.getOneSide() == SRCTRGSide.LEFT_INDEX) ? rowData.getLhsErrorInfo() : rowData.getRhsErrorInfo()), rowData.isOneSided());
}else{
if(rowData.getComparison() < 0){
this.writeRowError(rowData.getSrc_(), SRCTRGSide.LEFT_INDEX, rowData.getContext(), rowData.getContext()._sink
, "", rowData.isRecordRow(), rowData.getLhsErrorInfo(), rowData.isJoinFailed());
}else if(rowData.getComparison() > 0){
this.writeRowError(rowData.getTrg_(), SRCTRGSide.RIGHT_INDEX, rowData.getContext(), rowData.getContext()._sink
, "", rowData.isRecordRow(), rowData.getRhsErrorInfo(), rowData.isJoinFailed());
}else{
new ICECompareEngine().compareColumnDiffRuleExprGroovy(rowData.getSrc_(), rowData.getTrg_(), rowData.getLhsColumns()
,rowData.getRhsColumns(),rowData.getReplacedColumnsMap(),rowData.getExpressions()
,rowData.getExpressions_Groovy(),rowData.getExpressions_Org(),rowData.getContext()
,rowData.isDisplayRecord(),rowData.isOneSided(),rowData.isJoinFailed()
,rowData.getRuleType(), rowData.getSrcBindings(),gEngine
,rowData.getOneSide(), rowData.getLhsErrorInfo(),rowData.getRhsErrorInfo());
}
}
}
}catch (Exception e) {
// TODO: handle exception
}finally{
//Result result = new Result();
getSender().tell( rowData.getResult(), getSelf() );
}
}else if (message instanceof DataConsumerInspector) {
DataConsumerInspector status = (DataConsumerInspector) message;
getSender().tell( status.getResult(), getSelf() );
}
else{
//System.out.println("======================= unhahandled message worker");
unhandled(message);
}
}
private void writeRowError(Object[] row_, int sideIdx_, ICEEngineContext context_,
WritterInterface sink_, String exprResult, boolean recordRow, String errorInfo, boolean oneSided) throws IOException {
ICECompare.CompareType compareType = context_._dataComparison.getCompareType();
if (compareType == ICECompare.CompareType.COLUMN_DIFF)
return;
if (row_ == null)
return;
ICEComparison dataComparison = context_.getDataComparison();
long rowStep = context_.getRowStep();
SRCTRGSide side = SRCTRGSide.getEnumForConstant(sideIdx_);
//System.out.println("======> 11111 icecomparerow row::: "+ArrayUtils.toString(row_));
ICECompareRow rowDiff = new ICECompareRow(rowStep, row_, side, dataComparison, exprResult, recordRow, errorInfo, oneSided );
sink_.writeLog(rowDiff, context_);
}
private void writeCommon(Object[] lrow_, Object[] rrow_, int lsideIdx_, int rsideIdx_, ICEEngineContext context_,
WritterInterface sink_, String exprResult, boolean recordRow, String lhsErrorInfo, String rhsErrorInfo, boolean recordFinalExpression) throws IOException {
ICECompare.CompareType compareType = context_._dataComparison.getCompareType();
if (compareType == ICECompare.CompareType.COLUMN_DIFF)
return;
if (lrow_ == null && rrow_ == null)
return;
ICEComparison dataComparison = context_.getDataComparison();
long rowStep = context_.getRowStep();
SRCTRGSide lside = SRCTRGSide.getEnumForConstant(lsideIdx_);
SRCTRGSide rside = SRCTRGSide.getEnumForConstant(rsideIdx_);
CommonRowObj commonRow = new CommonRowObj(rowStep, lrow_, rrow_, lside, rside, dataComparison
,exprResult, recordRow, lhsErrorInfo, rhsErrorInfo, recordFinalExpression);
sink_.writeLog(commonRow, context_);
}
}
package com.ice.test;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.iceengine.compare.akka.DataEvaluatorActor;
import org.iceengine.compare.akka.MetricsListener;
import org.iceengine.compare.conf.ICEEngine;
import org.iceengine.compare.engine.ICEEngineContext;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.FromConfig;
import akka.routing.RoundRobinPool;
public class Main {
public static void main(String args[]) throws InterruptedException
{
System.out.println(" >>>> Cores >>> "+Runtime.getRuntime().availableProcessors());
//ActorSystem system = ICEEngineContext.instance();
Main.backendRun(new String[] { "2551","192.168.100.199" });
Main.backendRun(new String[] { "2552","192.168.100.199" });
Main.run(27256);
/*new Thread(new Runnable() {
@Override
public void run() {
Main.run(27256);
}
}).start();*/
/*new Thread(new Runnable() {
@Override
public void run() {
Main.run(27250);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
Main.run(27230);
}
}).start();*/
}
public static void backendRun(String args[])
{
final String port = args.length > 0 ? args[0] : "0";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
//withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.host="+hostname)).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("ExpressionEvaluation"));
// final Config config = ConfigFactory.parseString("akka.cluster.roles = [backend]").
// withFallback(ConfigFactory.load("ExpressionEvaluation"));
ActorSystem system = ActorSystem.create("iCEDQApp", config);
System.out.println("\t\t nrOfInstances >>>>>>>>> "+FromConfig.getInstance().routerDispatcher());
//system.actorOf(Props.create(ClientMaster.class), "expEvaluationBackend");
//system.actorOf(Props.create(EvaluateExpressionActor.class), "expEvaluationBackend");
// system.actorOf(new RoundRobinPool(8).props(Props.create(EvaluateExpressionActor.class)), "expEvaluationBackend");
system.actorOf(Props.create(DataEvaluatorActor.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(2)), "expEvaluationBackend");
//system.actorOf(FromConfig.getInstance().props(Props.create(EvaluateExpressionActor.class)),"expEvaluationBackend");
system.actorOf(Props.create(MetricsListener.class), "metricsListener");
// system.actorOf(
// ClusterSingletonManager.props(
// Master.props(workTimeout),
// PoisonPill.getInstance(),
// ClusterSingletonManagerSettings.create(system).withRole(role)
// ),
// "master");
}
public static void run(int riid){
//long riid =27256;
org.apache.logging.log4j.Logger instanceLog = org.apache.logging.log4j.LogManager.getLogger("ICEApp");
//String str ="C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27348_plan.xml";
// String str ="C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_Validation_107740_107802.plan.xml";
// String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_plan.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_lhs.dbConnectionInfo.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_27256_rhs.dbConnectionInfo.xml";
// String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.plan.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.lhs.dbConnectionInfo.xml,C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_172092_172095.rhs.dbConnectionInfo.xml";
// String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_Validation_43_Rows.plan.xml"; // 43 Rows Validation rule
// String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_43_Rows.plan.xml";
String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_BigFile Validation_Big_file.plan.xml"; // 43 Rows Validation rule
// String str = "C:\\file\\fixedfile\\ICERepo_MSSQL_Linux_Dev_ColumnDiff_Big_File_Rule.plan.xml";
try
{
Map<String, Object> domainSummary = null;
domainSummary = ICEEngine.startICEEngine(str, riid, null, instanceLog);
System.out.println(" BPEL RUNRULE summary - start " );
//System.out.println(" Problem Code = "+domainSummary.get("PROBLEMCODE"));
System.out.println(" Problem Msg = "+domainSummary.get("PROBLEMMSG"));
System.out.println(" LHS missing count = "+domainSummary.get("LHSM"));
System.out.println(" RHS missing count = "+domainSummary.get("RHSM"));
System.out.println(" COMMON rows count = "+domainSummary.get("CMN"));
System.out.println(" COLUMN DIFF COUNT = " +domainSummary.get("CDC"));
System.out.println(" Total diff count = "+domainSummary.get("TOTALCOUNT"));
System.out.println(" Source Query Value = "+domainSummary.get("SRCQV"));
System.out.println(" Target Query Value = "+domainSummary.get("TRGQV"));
System.out.println(" Scalar Result = "+domainSummary.get("SCALARRESULT"));
System.out.println(" Final Result = "+domainSummary.get("FRESULT"));
System.out.println(" Final Result By = "+domainSummary.get("FRESULTBY"));
System.out.println(("BPEL RUNRULE summary - end "));
} catch (Exception eee)
{
eee.printStackTrace();
}catch (Error e) {
e.printStackTrace();
// TODO: handle exception
}
}
}
package org.iceengine.compare.akka;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.ArrayUtils;
import org.iceengine.compare.engine.ICEEngineContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.routing.FromConfig;
import akka.routing.RoundRobinPool;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
public class Master extends UntypedActor {
private int numberOfWorkers;
private long rrid;
//private ActorRef workerRouter;
private final ActorRef dataConsumerActor;
private final ActorRef dataEvaluatorActor;
private long actorCompletedCounter = 0;
private long actorSentCounterToDataEvaluatorActor = 0;
private long start;
private long end;
private Result result;
private boolean dataConsumerInspectorFlag = false;
private String _ruleType;
public Master(int nrOfInstances, long rrid, String _ruleType)
{
this.numberOfWorkers = nrOfInstances;
this.rrid = rrid;
System.out.println("NrOfInstances == "+this.numberOfWorkers);
start = System.nanoTime();
this.result = new Result();
this._ruleType = _ruleType;
if("report".equalsIgnoreCase(_ruleType)){
dataConsumerActor = this.getContext().actorOf(Props.create(DataConsumerActorForReportRule.class),"report_"+this.rrid);
}else{
dataConsumerActor = this.getContext().actorOf(Props.create(DataConsumerActorForColumnDiffRule.class),"recon_"+this.rrid);
}
//dataEvaluatorActor = this.getContext().actorOf(Props.create(DataEvaluatorActorForReportRule.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
//dataEvaluatorActor = this.getContext().actorOf(Props.create(DataEvaluatorActor.class).withDispatcher("worker-dispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
dataEvaluatorActor = this.getContext().actorOf(FromConfig.getInstance().props(), "workerRouter_"+this.rrid);
System.out.println(">>>> dataEvaluatorActor >>> "+this.getContext().children());
//dummyMaster = new Object();
//workerRouter = this.getContext().actorOf(Props.create(Worker.class).withRouter(new RoundRobinRouter(this.nrOfInstances)), "workerRouter_"+this.rrid);
//workerRouter = this.getContext().actorOf(new RoundRobinPool(this.numberOfWorkers).props(Props.create(Worker.class)), "workerRouter_"+this.rrid);
//workerRouter = getContext().actorOf(Props.create(Worker.class).withDispatcher("defaultDispatcher").withRouter(new RoundRobinPool(this.numberOfWorkers)), "workerRouter_"+this.rrid);
//workerRouter = this.getContext().actorOf(new RandomPool(this.nrOfInstances).props(Props.create(Worker.class)), "workerRouter");
}
/* @Override
public void preStart() {
try{
processMessages(this.context_ );
}catch(Exception e){
}
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
}*/
@Override
public void onReceive(Object message) throws IOException {
//System.out.println("33333333333333333333333=== "+message);
if (message instanceof ICEEngineContext) {
//time.start();
System.out.println("===========> processing context...............");
ICEEngineContext _context = (ICEEngineContext )message;
dataConsumerActor.tell(_context, getSelf());
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
}else if(message instanceof RowData){
actorSentCounterToDataEvaluatorActor++;
RowData rowData = (RowData)message;
//System.out.println("Frontend Data >>>> "+ArrayUtils.toString(rowData.getSrc_()));
if(actorSentCounterToDataEvaluatorActor%100000==0)
System.out.print(" Kunal_ICE ");
rowData.setResult(this.result);
try{
dataEvaluatorActor.tell(rowData, getSelf());
}catch (Error e) {
e.printStackTrace();
// TODO: handle exception
}catch (Exception ee) {
ee.printStackTrace();
// TODO: handle exception
}
}else if (message instanceof Result) {
//System.out.println(" >>>>>>>>>>>>> COUNT >>>> "+actorCompletedCounter);
Result result = ( Result )message;
//System.out.println(" workerIndex >>>>>>>>>>>> "+result.getWorkerIndex()+" , processDataCount >>> "+result.getNoOfRecordsProcessed());
if(( ++actorCompletedCounter == actorSentCounterToDataEvaluatorActor) && dataConsumerInspectorFlag )
{
System.out.println("name>> "+getSelf().path().name()+" =========== Stop our actor hierarchy ================= actorCompletedCounter >> "+actorCompletedCounter+" , actorSentCounterToDataEvaluatorActor >> "+actorSentCounterToDataEvaluatorActor);
// Stop our actor hierarchy
//context_.close();
getContext().stop( getSelf() );
/*Cluster.get(getContext().system()).registerOnMemberRemoved(new Runnable() {
@Override
public void run() {
// exit JVM when ActorSystem has been terminated
final Runnable exit = new Runnable() {
@Override public void run() {
System.exit(0);
}
};
getContext().system().registerOnTermination(exit);
// shut down ActorSystem
getContext().system().terminate();
// In case ActorSystem shutdown takes longer than 10 seconds,
// exit the JVM forcefully anyway.
// We must spawn a separate thread to not block current thread,
// since that would have blocked the shutdown of the ActorSystem.
new Thread() {
@Override public void run(){
try {
Await.ready(getContext().system().whenTerminated(), Duration.create(5, TimeUnit.SECONDS));
} catch (Exception e) {
System.exit(-1);
}
}
}.start();
}
});*/
//getContext().stop(evaluateExpressionActor);
//getContext().system().shutdown();
System.out.println(" Done");
end = System.nanoTime();
System.out.println(" Total time >>> "+TimeUnit.SECONDS.convert(TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS),TimeUnit.MILLISECONDS));
}
//actorCompletedCounter++;
}else if (message instanceof DataConsumerInspector) {
dataConsumerInspectorFlag = true;
DataConsumerInspector status = (DataConsumerInspector) message;
this.result.setDataConsumerInspectorFlag(status.isDataConsumerInspectorFlag());
status.setResult(result);
//dataEvaluatorActor.tell(status, getSelf());
System.out.println(" >>>>>>>>>>>>>>>> result : "+this.result.isDataConsumerInspectorFlag()+" , actorSentCounterToDataEvaluatorActor >> "+actorSentCounterToDataEvaluatorActor+" , actorCompletedCounter >> "+actorCompletedCounter);
} else {
//System.out.println("===============> Final else unhandled message");
unhandled(message);
}
}
}