[
https://issues.apache.org/jira/browse/IGNITE-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14653221#comment-14653221
]
Darren Edmonds commented on IGNITE-1179:
----------------------------------------
Hi Dmitriy,
The factory will return the correct listener based on the type of job running
(only 1 type so far), so this line...
Code:
{code:title=FutureListenerFactory.java|borderStyle=solid}
...
return new NodeJobFutureListener(jobService, service, assetStoreService,
job, cacheService);
...
{code}
The NodeJobFutureListener itself will process the result. If the render failed
(depending on the failure) the job status is updated and the scheduler will
either pick the job up from another retry or it'll set it as failed.
If successful, the output (pdf byte array/stream) is saved to our asset store
(Filesystem) for later retrieval.
The job assets are deleted from the Ignite cache and we release the job permit
(java.util.concurrent.Semaphore) which the scheduler uses to track jobs in
progress before releasing new ones to the worker nodes.
Code:
{code:title=NodeJobFutureListener.java|borderStyle=solid}
...
private void processResult(IgniteFuture<NodeJobResult> jobResult) throws
UpdatingCoreException {
try {
NodeJobResult nodeJobResult = jobResult.get();
if (nodeJobResult == null) {
job.addStatus(JobStatusEnum.FAILED);
} else {
NodeJobError errorHandler = nodeJobResult.getNodeJobErrors();
if (errorHandler.hasEntries()) {
saveErrors(job, errorHandler);
}
if (!errorHandler.hasFatal()) {
byte[] resultDocument = nodeJobResult.getDocument();
saveResult(job, resultDocument, job.getJobType());
job.addStatus(JobStatusEnum.DONE);
} else {
job.addStatus(JobStatusEnum.FAILED);
}
}
updateJob(job);
} catch (IgniteException ex) {
Throwable cause = Throwables.getRootCause(ex);
LOG.warn("JOB [{}] GridException when obtaining result for this
object : {}", job.getKey(), cause.getMessage());
if (retryException(cause) && job.getRetryAttempts() <
service.getJobRetryLimit()) { // Retry
LOG.warn("JOB [{}] Reprocess job due to GridException exception
- attempting to resubmit.\n\tException is {}", job.getKey()
cause.getLocalizedMessage());
job.setPriority(JobPriorityEnum.HIGH);
job.retry();
job.addError(ErrorTypeEnum.ERROR, cause.toString());
job.addStatus(JobStatusEnum.NEW);
} else {
LOG.error("JOB [{}] Failed to process this job. Setting status
to FAILED.\n\tException is {}", job.getKey(), cause.getMessage());
job.retry();
job.addError(ErrorTypeEnum.FATAL, cause.toString());
job.addStatus(JobStatusEnum.FAILED);
}
updateJob(job);
} finally {
service.releaseJobPermit(job.getKey().toString());
// Remove any assets from cache.
DeletedCacheAssetsEvent event =
cacheService.deleteJobAssetsFromCache(new DeleteCacheAssetsEvent(job));
if(event.getStatus() == DeletedEvent.StatusEnum.EXCEPTION){
LOG.warn("An error occurred removing a jobs assets from the
cache.", event.getException());
}
}
}
...
{code}
The assets are removed from the Ignite cache with the following code:
{code:title=CacheServiceImpl.java|borderStyle=solid}
...
public DeletedCacheAssetsEvent
deleteGroupAssetsFromCache(DeleteCacheGroupAssetsEvent cacheGroupAssetsEvent) {
LOG.debug("DeleteCache : Getting group Job IDs...");
IgniteCache assetCache = grid.cache("ASSET");
RequestedGroupJobIdsEvent groupJobIdsEvent =
jobService.requestJobIds(new
RequestGroupJobIdsEvent(cacheGroupAssetsEvent.getGroupId()));
if (groupJobIdsEvent.getStatus() == RequestedEvent.StatusEnum.READ) {
LOG.debug("DeleteCache : Removing entries from cache...");
for (UUID jobId : groupJobIdsEvent.getGroupJobIds()) {
assetCache.remove(jobId.toString());
assetCache.clear(jobId.toString());
}
return new DeletedCacheAssetsEvent(DeletedEvent.StatusEnum.DELETED);
} else if(groupJobIdsEvent.getStatus() ==
RequestedEvent.StatusEnum.NOT_FOUND){
return new
DeletedCacheAssetsEvent(DeletedEvent.StatusEnum.NOT_FOUND);
} else {
return new DeletedCacheAssetsEvent(groupJobIdsEvent.getException());
}
}
...
{code}
> Futures stop working, unknown cause.
> ------------------------------------
>
> Key: IGNITE-1179
> URL: https://issues.apache.org/jira/browse/IGNITE-1179
> Project: Ignite
> Issue Type: Bug
> Components: compute
> Reporter: Darren Edmonds
> Assignee: Andrey Gura
> Attachments: ApplicationLogsAndConfig_1.3.2.7z
>
>
> Running against Apache Ignite version:
> ignite-core (1.3.2), ignite-spring (1.3.2), ignite-indexing (1.3.2) and
> ignite-slf4j (1.3.2).
> Originally built application against a Grid Gain version before moving to
> Apache Ignite.
> Web Application feeds jobs to the worker node(s) via it's own internal queue
> (client only Ignite). After a few thousand jobs have successfully been
> processed, the worker and server just stop with jobs still waiting in the web
> server queue. It appears that the worker node is running OK, and logs will
> show every job is successfully completing the render but the future on the
> server is not being triggered - loss of connection to node?
> I have attached the requested information to the ticket. It's highly likely
> I've done something wrong rather than this being a bug with Ignite - any
> advice would be appreciated,
> Attachment contains:
> - Server stack
> - Worker node stack
> - Server log file (simple debug output, no output observed from Ignite set at
> INFO level nor any exceptions)
> - Worker log file (same setup as server log file)
> - Ignite configuration files for both server and worker.
> - Launch parameters for the java command line and tomcat7 server.
> Original forum post:
> http://apache-ignite-users.70518.x6.nabble.com/Futures-not-being-triggered-td773.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)