thanks for your answer, 

I am more curious if this is my configuration in env where 3 threads 
compete for resources - and probably this causes the problem I ve described 
- or this is a general problem with notifying publisher about demand.
in other words if I have fast consumer (and adding to list is quite fast) 
what is a throughput of publisher. I cannot imagine that I drop almost all 
message in real life scenario. 

I hope that above make any sense :)

thanks for link for second question! I ve somehow overlooked it!

On Wednesday, 5 August 2015 11:25:41 UTC+2, Patrik Nordwall wrote:
>
>
>
> On Sun, Aug 2, 2015 at 1:42 PM, paweł kamiński <[email protected] 
> <javascript:>> wrote:
>
>> hi,
>> I have simple actor producer based on 
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorPublisher
>>
>> public class MessageProducer extends AbstractActorPublisher<Message>
>> {
>>     private final static Logger logger = 
>> LoggerFactory.getLogger(MessageProducer.class);
>>     private final ArrayDeque<Message> buf;
>>
>>     public MessageProducer(int maxBufferSize)
>>     {
>>         buf = new ArrayDeque<>(maxBufferSize);
>>         receive(ReceiveBuilder
>>                         .match(Message.class, msg -> buf.size() == 
>> maxBufferSize,
>>                                msg -> {
>>                                    if (logger.isTraceEnabled())
>>                                    {
>>                                        logger.trace("Denying {}. buffer size 
>> is {}.", msg, buf.size());
>>                                    }
>>                                    sender().tell(new Fail<>(msg.getMDC(), 
>> "denied"), self());
>>                                })
>>                         .match(Message.class,
>>                                msg -> {
>>                                    buf.addLast(msg);
>>                                    drain(totalDemand());
>>                                })
>>                         .match(ActorPublisherMessage.Request.class, request 
>> -> drain(totalDemand()))
>>                         .match(ActorPublisherMessage.Cancel.class, cancel -> 
>> stop())
>>                         
>> .match(ActorPublisherMessage.SubscriptionTimeoutExceeded.class, cancel -> 
>> stop())
>>                         .match(Status.Success.class, cancel -> stop())
>>                         .match(PoisonPill.class, cancel -> stop())
>>                         .matchAny(this::unhandled)
>>                         .build()
>>         );
>>     }
>>
>>     @Override
>>     public Duration subscriptionTimeout()
>>     {
>>         return Duration.create(1, TimeUnit.SECONDS);
>>     }
>>
>>     private void drain(long demand)
>>     {
>>
>>
>>         final int bufferSize = buf.size();
>>
>>         logger.debug("Stream is active {}. {}", isActive(), bufferSize);
>>         if (!isActive())
>>         {
>>             return;
>>         }
>>
>>         long maxItems = min(demand, bufferSize);
>>
>> logger.trace("Draining buffer with {} items. demand is {}.", maxItems, 
>> demand);
>> Stream
>> .iterate(0, i -> i + 1)
>> .limit(maxItems)
>> .forEach(i -> {
>> Message msg = buf.poll();
>> logger.trace("Sending message {}.", msg);
>> onNext(msg);
>> });
>> }
>>
>> private void stop() {
>> context().stop(self());
>> }
>> }
>>
>>
>> and I am creating the stream 
>>
>>
>> final Source<Message, ActorRef> stringSource = 
>> Source.actorPublisher(producerProps); // *<--- I construct producer with 5 
>> element buffer but actually it is irrelevant.*
>> final ActorRef producerRef = stringSource
>>         .map(msg -> msg.toString().toLowerCase())
>>         .to(Sink.foreach(item -> {
>>             logger.info("got message {}", item);
>>             messages.add(item);
>>         }))
>>         .run(materializer);
>>
>> final int requestedElementsCount = 100;
>> Thread thread = new Thread(() -> {
>>     Stream.iterate(0, i -> i + 1)
>>             .limit(requestedElementsCount)
>>             .forEach(i -> {
>>                 producerRef.tell(new Result<>("Index " + i), noSender());
>>
>> //                        sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
>>             });
>> });
>> thread.start();
>>
>>
>> after starting the thread I await if *messages* get *requestedElementsCount* 
>> elements but it never happens unless I add sleep to my thread above.
>>
>>
>> 1) I cannot figure out why is that. first of all MessageProducer is active 
>> but I can see in logs that demand is 0 and then buffer fills up and more 
>> messages are denied. is this my system/jvm/etc?
>>
>> I though that producer, message publisher and consumer runs on different 
>> threads and there should be no problem with consuming 100 messages, map them 
>> and put items to *messages* list. 
>>
>>
> You immediately send 100 messages to the MessageProducer. Those will be 
> possibly be received before the demand has been requested and therefore 
> they are dropped by your actor.
>  
>
>>
>>
>> here is a sample output
>>
>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 1
>>
>> 13:21:04.029 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
>> 0. *// <---- even there are elements in buffer, demand is 0*
>>
>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 2
>>
>> 13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
>> 0.
>>
>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 3
>>
>> 13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
>> 0.
>>
>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 4
>>
>> 13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
>> 0.
>>
>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5
>>
>> 13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
>> 0.
>>
>> ...
>>
>> 13:34:02.449 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Denying 
>> Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 9}. buffer 
>> size is 5. *<-- **lots of messages are dropped*
>> 13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Denying 
>> Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 10}. buffer 
>> size is 5.
>> 13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Denying 
>> Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 11}. buffer 
>> size is 5.
>>
>> ...
>>
>> 13:24:42.818 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5 *// <---- now 
>> subscriber wakes up and sends demand but there is no more messages than 5 :/*
>>
>> 13:21:04.052 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 4 items. demand is 
>> 4.
>> 13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 1 items. demand is 
>> 2.
>> 13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE 
>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 
>> 3.
>>
>>
>> 2) the other thing I don't understand is how to get value from sink if I 
>> change it to 
>>
>>
>> final ActorRef producerRef = stringSource
>>         .map(msg -> msg.toString().toLowerCase())
>>         .to(*Sink.head()*)   *// <---------------------- from foreach to 
>> head*
>>         .run(materializer);
>>
>>
>> head will return feature with first element encountered in stream, but since 
>> run returns actorRef I don't see the way to get that future.
>>
>> I think you will find the answer here: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-flows-and-basics.html#Combining_materialized_values
>
> /Patrik
>
>  
>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to