hi,
I have simple actor producer based 
on 
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorPublisher

public class MessageProducer extends AbstractActorPublisher<Message>
{
    private final static Logger logger = 
LoggerFactory.getLogger(MessageProducer.class);
    private final ArrayDeque<Message> buf;

    public MessageProducer(int maxBufferSize)
    {
        buf = new ArrayDeque<>(maxBufferSize);
        receive(ReceiveBuilder
                        .match(Message.class, msg -> buf.size() == 
maxBufferSize,
                               msg -> {
                                   if (logger.isTraceEnabled())
                                   {
                                       logger.trace("Denying {}. buffer size is 
{}.", msg, buf.size());
                                   }
                                   sender().tell(new Fail<>(msg.getMDC(), 
"denied"), self());
                               })
                        .match(Message.class,
                               msg -> {
                                   buf.addLast(msg);
                                   drain(totalDemand());
                               })
                        .match(ActorPublisherMessage.Request.class, request -> 
drain(totalDemand()))
                        .match(ActorPublisherMessage.Cancel.class, cancel -> 
stop())
                        
.match(ActorPublisherMessage.SubscriptionTimeoutExceeded.class, cancel -> 
stop())
                        .match(Status.Success.class, cancel -> stop())
                        .match(PoisonPill.class, cancel -> stop())
                        .matchAny(this::unhandled)
                        .build()
        );
    }

    @Override
    public Duration subscriptionTimeout()
    {
        return Duration.create(1, TimeUnit.SECONDS);
    }

    private void drain(long demand)
    {


        final int bufferSize = buf.size();

        logger.debug("Stream is active {}. {}", isActive(), bufferSize);
        if (!isActive())
        {
            return;
        }

        long maxItems = min(demand, bufferSize);

logger.trace("Draining buffer with {} items. demand is {}.", maxItems, 
demand);
Stream
.iterate(0, i -> i + 1)
.limit(maxItems)
.forEach(i -> {
Message msg = buf.poll();
logger.trace("Sending message {}.", msg);
onNext(msg);
});
}

private void stop() {
context().stop(self());
}
}


and I am creating the stream 


final Source<Message, ActorRef> stringSource = 
Source.actorPublisher(producerProps); // *<--- I construct producer with 5 
element buffer but actually it is irrelevant.*
final ActorRef producerRef = stringSource
        .map(msg -> msg.toString().toLowerCase())
        .to(Sink.foreach(item -> {
            logger.info("got message {}", item);
            messages.add(item);
        }))
        .run(materializer);

final int requestedElementsCount = 100;
Thread thread = new Thread(() -> {
    Stream.iterate(0, i -> i + 1)
            .limit(requestedElementsCount)
            .forEach(i -> {
                producerRef.tell(new Result<>("Index " + i), noSender());

//                        sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
            });
});
thread.start();


after starting the thread I await if *messages* get *requestedElementsCount* 
elements but it never happens unless I add sleep to my thread above.


1) I cannot figure out why is that. first of all MessageProducer is active but 
I can see in logs that demand is 0 and then buffer fills up and more messages 
are denied. is this my system/jvm/etc?

I though that producer, message publisher and consumer runs on different 
threads and there should be no problem with consuming 100 messages, map them 
and put items to *messages* list. 


here is a sample output

13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 1

13:21:04.029 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 0. 
*// <---- even there are elements in buffer, demand is 0*

13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 2

13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
0.

13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 3

13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
0.

13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 4

13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
0.

13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5

13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
0.

...

13:34:02.449 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Denying 
Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 9}. buffer size 
is 5. *<-- **lots of messages are dropped*
13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Denying 
Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 10}. buffer size 
is 5.
13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Denying 
Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 11}. buffer size 
is 5.

...

13:24:42.818 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5 *// <---- now 
subscriber wakes up and sends demand but there is no more messages than 5 :/*

13:21:04.052 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 4 items. demand is 4.
13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 1 items. demand is 2.
13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 3.


2) the other thing I don't understand is how to get value from sink if I change 
it to 


final ActorRef producerRef = stringSource
        .map(msg -> msg.toString().toLowerCase())
        .to(*Sink.head()*)   *// <---------------------- from foreach to head*
        .run(materializer);


head will return feature with first element encountered in stream, but since 
run returns actorRef I don't see the way to get that future.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to