Hy there,

I have an akka scheduler to reindex some data in the search-engine and to 
resize the images of every user.

It will be called directly by

system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), 
TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null);

The (flatted to one method for better readability) actor:
 
       @Override
        public void onReceive(Object msg) throws Exception
        {
            long start = System.currentTimeMillis();
            logger.info("Start reindexing of all User");
            
            if (indexing) {
                logger.info("Already reindexing. Skip");
                return;
            }
            
            try {
                int page = 0;
                PagedList<User> users;
                
                do {
                    users = User.find.findPagedList(page, COUNT_OF_ROWS);
                    List<User> active = users.getList().stream().filter(g -> 
g.isActive()).collect(Collectors.toList());
                    
                    List<CompletionStage<Response>> stages = new ArrayList<
CompletionStage<Response>>(COUNT_OF_ROWS);
                    for (User user : active) {
                        ActorRef userActor = system.actorOf(Props.create(
DependencyInjector.class, injector, UpdateRedisActor.class));
                        userActor.tell(new UpdateRedisActor.Index(user.getId
()), null);
                        
                        if (user.hasProfilePicture()) {
                            /**
                             * get the image as FilePart (imitate http 
upload)
                             */
                            File image = new File(user.getProfilePicture().
getFileName());
                            FilePart<Object> filePart = new FilePart<Object
>("", user.getFirstname(), "image/jpg", image);
                            
                            String randomFileName = UUID.randomUUID().
toString();
                            
                            /**
                             * Create new actor
                             */
                            ActorRef imgActor = system.actorOf(Props.create(
DependencyInjector.class, injector, ImageActor.class));
                            
                            /**
                             * And ask it
                             */
                            CompletionStage<Response> stage = 
FutureConverters
                                    .toJava(ask(imgActor, new ImageActor.
Request(filePart, randomFileName, imageResizeOptionFactory.getValues()), 
ImageBroker.TEN_SECONDS * 20))
                                    .thenApply((response) -> ((ImageActor.
Response) response))
                                    .exceptionally((e) -> {
                                        Logger.error("Error while creating 
picture [" + randomFileName + "]", e);
                                        return new ImageActor.Response(500);
                                    });
                            
                            /**
                             * Collect stages
                             */
                            stages.add(stage.toCompletableFuture().
thenApplyAsync(response -> {
                                user.updateAndSetProfileImage(response.
fileName, response.filePath);
                                user.save();
                                Logger.info("User " + user.getId() + " 
saved with new image");
                                return response;
                            }));
                        }
                    }
                    /**
                     * Wait every iteration for the actors
                     */
                    CompletableFuture.allOf(stages.toArray(new 
CompletableFuture<?>[stages.size()])).toCompletableFuture().get();
                }
                while (users.hasNext());
            }
            catch (Exception e) {
                logger.error("Fatal error while reindexing: ", e);
            }
            finally {
                logger.info("Finish reindexing of all user in " + (System.
currentTimeMillis() - start) + "ms"); //does not appear in logfile.
                indexing = false;
            }
        }
        
    }

My expectation was that the scheduler fire-and-forget to the redis actor 
and then asks the image-actor. Collect the result from the image actor and 
update the user with the new image. Of course the iteration ends before all 
images were resized.  

What happened is that the scheduler fire-and-forget to the redis actor 
(works!) and resizes two images (Sometimes 3 or 1 or 0, but never more and 
in most cases two). The actor-system stops immediately every work. The 
count of threads decreases and the cpu-loads sinks down. But (and that is 
the most strange part): The webserver does not react anymore to any 
request. It has to be restarted.  

All works fine if I do not collect the stages, but wait for every stage 
(..toCompletableFuture().get()). But that seems not the way for an 
asynchronous system.

Has anyone an idea what my mistake is?

Thanks and best regards
Simon

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to