I have Actor system Called organization
Message flow is organization to
business—>payroll—>tax—>taxAggregator—>businessEntityAggregator—>OrganizationAgrregator—->fileActor
public class OrganizationApplication {
public static void main(String[] args) throws Exception {
final ActorSystem orgSystem = ActorSystem.create(“Organization”);
ActorRef organizationMaster = orgSystem
.actorOf(Props.create(OrganizationMaster.class));
organizationMaster.tell(organizationId, null);
}
}
public class OrganizationMaster extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(OrganizationMaster.class);
public static Props props() {
return Props.create(OrganizationMaster.class);
}
public OrganizationMaster() {
}
@Override
public void preStart() throws Exception {
instanceId = RandomStringUtils.randomAlphanumeric(6);
jobTrackingActorID = "jobTrackingActor-" + instanceId;
jobTrackingActor =
getContext().system().actorOf(Props.create(JobTrackingActor.class),
jobTrackingActorID);
super.preStart();
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, orgID -> {
logger.info(">>>>OrganizationMaster job received:" + orgID);
ActorRef organizationAggregatorActor = getContext()
.actorOf(Props.create(OrganizationAggregator.class, payrollContext
.getApplicationArea(),
organization, businessDTOs, siteIdList, jobTrackingServices, orgID,
jobTrackingActor));
organizationAggregatorActor.tell(organization, getSelf());
List<BusinessID> businessID ;/where we are get List of BusinessEnity from
Mongodb
for (BusinessEntity business : businessEntities) {
ActorRef businessEntity =
getContext().actorOf(Props.create(BusinessEntityActor.class,
organizationAggregatorActor, business.getPayrollRun(), business,
businessDTOs, rollup));
businessEntity.tell(business, getSelf());
}
logger.info("<<<<<<Organisation Master job assignment done");
}).build();
}
public class BusinessEntityActor extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(BusinessEntityActor.class);
private boolean rollup;
ActorRef businessEntityAggregator;
ActorRef payrollRunActor = getContext().actorOf(new RoundRobinPool(
payrollRuns.size())
.props(Props.create(PayrollRunActor.class, businessEntityAggregator,
businessEntity, businessDTOs)));
public static Props props() {
return Props.create(BusinessEntityActor.class);
}
@Override
public void preStart() throws Exception {
instanceId = RandomStringUtils.randomAlphanumeric(6);
businessEntityAggregatorId = "businessEntityAggregator-" + instanceId;
businessEntityAggregator = getContext().actorOf(
Props.create(BusinessEntityAggregator.class, organizationAggregatorActor,
businessEntity),
businessEntityAggregatorId);
super.preStart();
}
@Override
public Receive createReceive() {
return receiveBuilder().match(BusinessEntity.class, businessEntity -> {
logger.info(">>>>>>Business Entity job received:" + businessEntity.getId());
if (rollup) {
logger.info("Rollingup business entities.");
businessEntityAggregator.tell(businessEntity, getSelf());
} else {
for (PayrollRun payrollRun : payrollRuns) {
ActorRef payrollRunActor = getContext().actorOf(new RoundRobinPool(
payrollRuns.size())
.props(Props.create(PayrollRunActor.class, businessEntityAggregator,
businessEntity, businessDTOs)));
payrollRunActor.tell(payrollRun, getSelf());
}
}
logger.info("<<<<<<<Business Entity assignment done");
}).match(PayrollRun.class, maxPaydatePayrollRun -> {
ActorRef payrollRunActor = getContext()
.actorOf(Props.create(PayrollRunActor.class, businessEntityAggregator,
businessEntity, businessDTOs));
payrollRunActor.tell(maxPaydatePayrollRun, getSelf());
}).build();
}
}
public class PayrollRunActor extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(PayrollRunActor.class);
public static Props props() {
return Props.create(PayrollRunActor.class);
}
ActorRef businessEntityAggregator;
@Override
public Receive createReceive() {
return receiveBuilder().match(PayrollRun.class, payrollRun -> {
logger.info(">>>>>PayrollRun job received:" + payrollRun.getPayDate());
//final ActorSystem taxSummarySystem = ActorSystem.create("taxSummary");
ActorRef taxSummary = getContext()
.actorOf(Props.create(TaxSummaryActor.class, businessEntityAggregator,
businessEntity, payrollRun, businessDTOs));
taxSummary.tell(payrollRun.getTaxSummary(), getSelf());
logger.info("<<<<<<<PayrollRun actor job is done");
}).build();
}
}
public class TaxSummaryActor extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(TaxSummaryActor.class);
@Override
public Receive createReceive() {
return receiveBuilder().match(TaxSummary.class, taxSummary -> {
logger.info("TaxSummary request received.");
//final ActorSystem taxsummaryAggregatorSystem =
ActorSystem.create("taxsummaryAggregator");
ActorRef taxsummaryAggregatorActor = getContext().actorOf(
Props.create(TaxSummaryAggregrator.class, businessEntityAggregator,
businessEntity, payrollRun, businessDTOs));
taxsummaryAggregatorActor.tell(taxSummary, getSelf());
logger.info("TaxSummary job is done");
}).build();
}
}
public class TaxSummaryAggregrator extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(TaxSummaryAggregrator.class);
private ActorRef businessEntityAggregator;
private BusinessEntity businessEntity;
private PayrollRun payrollRun;
private List<BusinessDTO> businessDTOs;
public TaxSummaryAggregrator(ActorRef businessEntityAggregator,
BusinessEntity businessEntity,
PayrollRun payrollRun, List<BusinessDTO> businessDTOs) {
this.businessEntity = businessEntity;
this.businessEntityAggregator = businessEntityAggregator;
this.payrollRun = payrollRun;
this.businessDTOs = businessDTOs;
}
@Override
public Receive createReceive() {
return receiveBuilder().match(TaxSummary.class, taxsummary -> {
logger.info("TaxSummaryAggregator request received.");
List<String> records = generatePeriodicData(payrollRun);
businessEntityAggregator.tell(records, getSelf());
logger.info("TaxSummaryAggregator job is done");
}).build();
}
public class BusinessEntityAggregator extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(BusinessEntityAggregator.class);
private List<String> records = new LinkedList<>();
private int payrollRunCount = 0;
private BusinessEntity businessEntity;
private ActorRef organizationAggregatorActor;
private List<PayrollRun> payrollRuns = null;
public static Props props() {
return Props.create(BusinessEntityAggregator.class);
}
public BusinessEntityAggregator(ActorRef organizationAggregatorActor,
BusinessEntity businessEntity) {
this.organizationAggregatorActor = organizationAggregatorActor;
this.businessEntity = businessEntity;
}
@Override
public Receive createReceive() {
return receiveBuilder().match(BusinessEntity.class, rollupBusinessEntity ->
{
logger.info(">>>>>>Business Aggreatator job received :");
organizationAggregatorActor.tell(maxPaydatePayrollRun, getSelf());
logger.info(">>>>>>Business Aggreatator job completed:");
}).match(List.class, data -> {
logger.info(">>>>>>Business Aggreatator job received");
organizationAggregatorActor.tell(businessDTO, getSelf());
}
}).build();
}
public class OrganizationAggregator extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(OrganizationAggregator.class);
//final ActorSystem fileWriterActorSystem =
ActorSystem.create("fileWriterActor");
public OrganizationAggregator(ApplicationArea applicationArea, Organization
organization, List<BusinessDTO> businessDTOs, Set<String> siteIdList,
JobTrackingService jobTrackingServices, String orgId, ActorRef
jobTrackingActor) {
this.organization = organization;
this.applicationArea = applicationArea;
this.orgId = orgId;
this.businessDTOs = businessDTOs;
this.siteIdList = siteIdList;
this.jobTrackingServices = jobTrackingServices;
this.jobTrackingActor = jobTrackingActor;
}
public static Props props() {
return Props.create(OrganizationAggregator.class);
}
@SuppressWarnings("unchecked")
@Override
public Receive createReceive() {
return receiveBuilder().match(Organization.class, organization -> {
logger.info(">>>>>>organization Aggregation job received:" + organization
.getId());
logger.info("<<<<<<<Organization aggregation done");
}).match(BusinessDTO.class, businessDTO -> {
ActorRef fileActor = getContext().actorOf(Props.create(FileActor.class));
fileActor.tell(recordsBySiteId, getSelf());
}).build();
}
}
public class FileActor extends AbstractActor {
private static final Logger logger =
LoggerFactory.getLogger(FileWriterActor.class);
@Override
public Receive createReceive() {
return receiveBuilder().match(List.class, this::receiveMessage)
.matchAny(o -> logger.info("<<<<<<< Received unknown message!!!!")).build();
}
private void receiveMessage(List<String> data) throws IOException {
write the records in to file
}
}
My test Class
@RunWith(PowerMockRunner.class)
@PrepareForTest(OrganizationMaster.class)
public class OrganizationMasterTestInitializer {
private String orgId = "1234fcvbng";
private boolean rollup = false;
Mongoclient mongoClient = PowerMockito.mock(Mongoclient.class);
public static final long DEFAULT_TIMEOUT = 5 * 60000L;
static {
System.setProperty("test.timeout", Long.toString(DEFAULT_TIMEOUT));
}
@BeforeClass
public static void setup() {
}
@AfterClass
public static void teardown() {
//system.terminate();
}
@Test
public void testOrganizationMaster() throws Exception {
ClassLoader classLoader = getClass().getClassLoader();
ObjectMapper mapper = new ObjectMapper();
final TestKit teskit=new TestKit(ActorSystem.apply());
BufferedReader bufferedReaderPayrollRun= new BufferedReader(new FileReader(
classLoader.getResource(“organization.json").getFile()));
PayrollContext payrollcontext=mapper.readValue(bufferedReaderPayrollRun,
PayrollContext.class);
PowerMockito.whenNew(EmploymentTaxMongoClientService.class
).withNoArguments().thenReturn(mongoClient); PowerMockito.when(mongoClient
.getPeriodicPayrollContext(anyString())).thenReturn(payrollcontext);
BufferedReader cctConfigReader = new BufferedReader(
new FileReader(classLoader.getResource("cctconfiguration.json").getFile()));
Map<String, Object> cctConfig = mapper.readValue(cctConfigReader, Map.class)
;
List<Object> elements = (List<Object>)cctConfig.get("elements");
Map<String, Object> orgConfig = (Map<String, Object>)elements.get(0);
CCProfile cClient = PowerMockito.mock(CCProfile.class);
PowerMockito.whenNew(CCProfile.class).withNoArguments().thenReturn(cClient);
PowerMockito.when(cClient.getClientProfileAsMap(anyString())).thenReturn(
orgCfg);
TestActorRef<OrganizationMaster> organizationMasterRef =
TestActorRef.create(teskit.system(),Props.create(OrganizationMaster.class));
organizationMasterRef.tell(orgId, ActorRef.noSender());
}
}
Now am getting an Dead letter
[default-akka.actor.default-dispatcher-4]
[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj] Message
[java.util.LinkedList] from
Actor[akka://default/user/$$a/$b/$a/$a/$a/$a#1729850332] to
Actor[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj#112830547]
was not delivered. [1] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[default-akka.actor.default-dispatcher-4]
[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj] Message
[java.util.LinkedList] from
Actor[akka://default/user/$$a/$b/$a/$b/$a/$a#-2101218177] to
Actor[akka://default/user/$$a/$b/businessEntityAggregator-IJZZmj#112830547]
was not delivered. [2] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
THis is blocking the flow and file is not created
Can you please help me to resolve above issue and any suggestion to
change actor creation
Thanks in advance
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.