I have Actor system Called organization 

Message flow is organization to 
business—>payroll—>tax—>taxAggregator—>businessEntityAggregator—>OrganizationAgrregator—->fileActor


public class OrganizationApplication {


public static void main(String[] args) throws Exception {

final ActorSystem orgSystem = ActorSystem.create(“Organization”);

         ActorRef organizationMaster = orgSystem
.actorOf(Props.create(OrganizationMaster.class));

         organizationMaster.tell(organizationId, null);



}

}


public class OrganizationMaster extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(OrganizationMaster.class);

public static Props props() {

return Props.create(OrganizationMaster.class);

}


public OrganizationMaster() {


}


@Override

public void preStart() throws Exception {

instanceId = RandomStringUtils.randomAlphanumeric(6);

jobTrackingActorID = "jobTrackingActor-" + instanceId;

jobTrackingActor = 
getContext().system().actorOf(Props.create(JobTrackingActor.class),
jobTrackingActorID);

super.preStart();

}

@Override

public Receive createReceive() {

return receiveBuilder().match(String.class, orgID -> {

logger.info(">>>>OrganizationMaster job received:" + orgID);


ActorRef organizationAggregatorActor = getContext()

.actorOf(Props.create(OrganizationAggregator.class, payrollContext
.getApplicationArea(),

organization, businessDTOs, siteIdList, jobTrackingServices, orgID, 
jobTrackingActor));

organizationAggregatorActor.tell(organization, getSelf());



List<BusinessID> businessID ;/where we are get List of BusinessEnity from 
Mongodb

for (BusinessEntity business : businessEntities) {

ActorRef businessEntity = 
getContext().actorOf(Props.create(BusinessEntityActor.class,

organizationAggregatorActor, business.getPayrollRun(), business, 
businessDTOs, rollup));

businessEntity.tell(business, getSelf());

}

logger.info("<<<<<<Organisation Master job assignment done");

}).build();

}







public class BusinessEntityActor extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(BusinessEntityActor.class);

private boolean rollup;




ActorRef businessEntityAggregator;

ActorRef payrollRunActor = getContext().actorOf(new RoundRobinPool(
payrollRuns.size())

.props(Props.create(PayrollRunActor.class, businessEntityAggregator, 
businessEntity, businessDTOs)));


public static Props props() {

return Props.create(BusinessEntityActor.class);

}


@Override

public void preStart() throws Exception {

instanceId = RandomStringUtils.randomAlphanumeric(6);

businessEntityAggregatorId = "businessEntityAggregator-" + instanceId;

businessEntityAggregator = getContext().actorOf(

Props.create(BusinessEntityAggregator.class, organizationAggregatorActor, 
businessEntity),

businessEntityAggregatorId);

super.preStart();

}



@Override

public Receive createReceive() {

return receiveBuilder().match(BusinessEntity.class, businessEntity -> {

logger.info(">>>>>>Business Entity job received:" + businessEntity.getId());

if (rollup) {

logger.info("Rollingup business entities.");

businessEntityAggregator.tell(businessEntity, getSelf());

} else {

for (PayrollRun payrollRun : payrollRuns) {

ActorRef payrollRunActor = getContext().actorOf(new RoundRobinPool(
payrollRuns.size())

.props(Props.create(PayrollRunActor.class, businessEntityAggregator, 
businessEntity, businessDTOs)));

payrollRunActor.tell(payrollRun, getSelf());

}

}


logger.info("<<<<<<<Business Entity assignment done");


}).match(PayrollRun.class, maxPaydatePayrollRun -> {

ActorRef payrollRunActor = getContext()

.actorOf(Props.create(PayrollRunActor.class, businessEntityAggregator, 
businessEntity, businessDTOs));

payrollRunActor.tell(maxPaydatePayrollRun, getSelf());

}).build();

}

}



public class PayrollRunActor extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(PayrollRunActor.class);



public static Props props() {

return Props.create(PayrollRunActor.class);

}


ActorRef businessEntityAggregator;



@Override

public Receive createReceive() {

return receiveBuilder().match(PayrollRun.class, payrollRun -> {

logger.info(">>>>>PayrollRun job received:" + payrollRun.getPayDate());

//final ActorSystem taxSummarySystem = ActorSystem.create("taxSummary");

ActorRef taxSummary = getContext()

.actorOf(Props.create(TaxSummaryActor.class, businessEntityAggregator, 
businessEntity, payrollRun, businessDTOs));

taxSummary.tell(payrollRun.getTaxSummary(), getSelf());

logger.info("<<<<<<<PayrollRun actor job is done");

}).build();

}


}



public class TaxSummaryActor extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(TaxSummaryActor.class);



@Override

public Receive createReceive() {

return receiveBuilder().match(TaxSummary.class, taxSummary -> {

logger.info("TaxSummary request received.");

//final ActorSystem taxsummaryAggregatorSystem = 
ActorSystem.create("taxsummaryAggregator");

ActorRef taxsummaryAggregatorActor = getContext().actorOf(

Props.create(TaxSummaryAggregrator.class, businessEntityAggregator, 
businessEntity, payrollRun, businessDTOs));

taxsummaryAggregatorActor.tell(taxSummary, getSelf());

logger.info("TaxSummary job is done");

}).build();

}


}



public class TaxSummaryAggregrator extends AbstractActor {


private static final Logger logger = 
LoggerFactory.getLogger(TaxSummaryAggregrator.class);


private ActorRef businessEntityAggregator;

private BusinessEntity businessEntity;

private PayrollRun payrollRun;

private List<BusinessDTO> businessDTOs;


public TaxSummaryAggregrator(ActorRef businessEntityAggregator, 
BusinessEntity businessEntity,

PayrollRun payrollRun, List<BusinessDTO> businessDTOs) {

this.businessEntity = businessEntity;

this.businessEntityAggregator = businessEntityAggregator;

this.payrollRun = payrollRun;

this.businessDTOs = businessDTOs;

}


@Override

public Receive createReceive() {

return receiveBuilder().match(TaxSummary.class, taxsummary -> {

logger.info("TaxSummaryAggregator request received.");

List<String> records = generatePeriodicData(payrollRun);

businessEntityAggregator.tell(records, getSelf());

logger.info("TaxSummaryAggregator job is done");

}).build();

}


public class BusinessEntityAggregator extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(BusinessEntityAggregator.class);


private List<String> records = new LinkedList<>();

private int payrollRunCount = 0;

private BusinessEntity businessEntity;

private ActorRef organizationAggregatorActor;

private List<PayrollRun> payrollRuns = null;


public static Props props() {

return Props.create(BusinessEntityAggregator.class);

}


public BusinessEntityAggregator(ActorRef organizationAggregatorActor, 
BusinessEntity businessEntity) {

this.organizationAggregatorActor = organizationAggregatorActor;

this.businessEntity = businessEntity;

}


@Override

public Receive createReceive() {

return receiveBuilder().match(BusinessEntity.class, rollupBusinessEntity -> 
{

logger.info(">>>>>>Business Aggreatator  job received :");

organizationAggregatorActor.tell(maxPaydatePayrollRun, getSelf());

logger.info(">>>>>>Business Aggreatator  job completed:");


}).match(List.class, data -> {

logger.info(">>>>>>Business Aggreatator  job received");


organizationAggregatorActor.tell(businessDTO, getSelf());

}

}).build();

}




public class OrganizationAggregator extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(OrganizationAggregator.class);

//final ActorSystem fileWriterActorSystem = 
ActorSystem.create("fileWriterActor");


public OrganizationAggregator(ApplicationArea applicationArea, Organization 
organization, List<BusinessDTO> businessDTOs, Set<String> siteIdList,

JobTrackingService jobTrackingServices, String orgId, ActorRef 
jobTrackingActor) {

this.organization = organization;

this.applicationArea = applicationArea;

this.orgId = orgId;

this.businessDTOs = businessDTOs;

this.siteIdList = siteIdList;

this.jobTrackingServices = jobTrackingServices;

this.jobTrackingActor = jobTrackingActor;

}


public static Props props() {

return Props.create(OrganizationAggregator.class);

}


@SuppressWarnings("unchecked")

@Override

public Receive createReceive() {

return receiveBuilder().match(Organization.class, organization -> {

logger.info(">>>>>>organization Aggregation job received:" + organization
.getId());

logger.info("<<<<<<<Organization aggregation done");

}).match(BusinessDTO.class, businessDTO -> {

ActorRef fileActor = getContext().actorOf(Props.create(FileActor.class));

fileActor.tell(recordsBySiteId, getSelf());


}).build();

}

}





public class FileActor extends AbstractActor {

private static final Logger logger = 
LoggerFactory.getLogger(FileWriterActor.class);



@Override

public Receive createReceive() {

return receiveBuilder().match(List.class, this::receiveMessage)

.matchAny(o -> logger.info("<<<<<<< Received unknown message!!!!")).build();

}


private void receiveMessage(List<String> data) throws IOException {

write the records in to file

}


}



My test Class





@RunWith(PowerMockRunner.class)

@PrepareForTest(OrganizationMaster.class)

public class OrganizationMasterTestInitializer {

private String orgId = "1234fcvbng";


private boolean rollup = false;

Mongoclient mongoClient = PowerMockito.mock(Mongoclient.class);

public static final long DEFAULT_TIMEOUT = 5 * 60000L;

static {

        System.setProperty("test.timeout", Long.toString(DEFAULT_TIMEOUT));

    }

@BeforeClass

public static void setup() {

}


@AfterClass

public static void teardown() {

//system.terminate();

}




@Test

public void testOrganizationMaster() throws Exception {

ClassLoader classLoader = getClass().getClassLoader();

ObjectMapper mapper = new ObjectMapper();

final TestKit teskit=new TestKit(ActorSystem.apply());

  

BufferedReader bufferedReaderPayrollRun= new BufferedReader(new FileReader(
classLoader.getResource(“organization.json").getFile()));

PayrollContext payrollcontext=mapper.readValue(bufferedReaderPayrollRun, 
PayrollContext.class);

PowerMockito.whenNew(EmploymentTaxMongoClientService.class
).withNoArguments().thenReturn(mongoClient); PowerMockito.when(mongoClient
.getPeriodicPayrollContext(anyString())).thenReturn(payrollcontext);

BufferedReader cctConfigReader = new BufferedReader(

new FileReader(classLoader.getResource("cctconfiguration.json").getFile()));

Map<String, Object> cctConfig = mapper.readValue(cctConfigReader, Map.class)
;

List<Object> elements = (List<Object>)cctConfig.get("elements");

Map<String, Object> orgConfig = (Map<String, Object>)elements.get(0);

CCProfile cClient = PowerMockito.mock(CCProfile.class);

PowerMockito.whenNew(CCProfile.class).withNoArguments().thenReturn(cClient);

PowerMockito.when(cClient.getClientProfileAsMap(anyString())).thenReturn(
orgCfg);

TestActorRef<OrganizationMaster> organizationMasterRef = 
TestActorRef.create(teskit.system(),Props.create(OrganizationMaster.class));

organizationMasterRef.tell(orgId, ActorRef.noSender());

}

}



Now am getting an Dead letter 


 [default-akka.actor.default-dispatcher-4] 
[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj] Message 
[java.util.LinkedList] from 
Actor[akka://default/user/$$a/$b/$a/$a/$a/$a#1729850332] to 
Actor[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj#112830547] 
was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.

[default-akka.actor.default-dispatcher-4] 
[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj] Message 
[java.util.LinkedList] from 
Actor[akka://default/user/$$a/$b/$a/$b/$a/$a#-2101218177] to 
Actor[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj#112830547] 
was not delivered. [2] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.


THis is blocking the flow and file is not created 


Can you please help me  to resolve  above issue and any suggestion to 
change actor creation 



Thanks in advance

 





-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to