hi,
I have simple actor producer based
on
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorPublisher
public class MessageProducer extends AbstractActorPublisher<Message>
{
private final static Logger logger =
LoggerFactory.getLogger(MessageProducer.class);
private final ArrayDeque<Message> buf;
public MessageProducer(int maxBufferSize)
{
buf = new ArrayDeque<>(maxBufferSize);
receive(ReceiveBuilder
.match(Message.class, msg -> buf.size() ==
maxBufferSize,
msg -> {
if (logger.isTraceEnabled())
{
logger.trace("Denying {}. buffer size is
{}.", msg, buf.size());
}
sender().tell(new Fail<>(msg.getMDC(),
"denied"), self());
})
.match(Message.class,
msg -> {
buf.addLast(msg);
drain(totalDemand());
})
.match(ActorPublisherMessage.Request.class, request ->
drain(totalDemand()))
.match(ActorPublisherMessage.Cancel.class, cancel ->
stop())
.match(ActorPublisherMessage.SubscriptionTimeoutExceeded.class, cancel ->
stop())
.match(Status.Success.class, cancel -> stop())
.match(PoisonPill.class, cancel -> stop())
.matchAny(this::unhandled)
.build()
);
}
@Override
public Duration subscriptionTimeout()
{
return Duration.create(1, TimeUnit.SECONDS);
}
private void drain(long demand)
{
final int bufferSize = buf.size();
logger.debug("Stream is active {}. {}", isActive(), bufferSize);
if (!isActive())
{
return;
}
long maxItems = min(demand, bufferSize);
logger.trace("Draining buffer with {} items. demand is {}.", maxItems,
demand);
Stream
.iterate(0, i -> i + 1)
.limit(maxItems)
.forEach(i -> {
Message msg = buf.poll();
logger.trace("Sending message {}.", msg);
onNext(msg);
});
}
private void stop() {
context().stop(self());
}
}
and I am creating the stream
final Source<Message, ActorRef> stringSource =
Source.actorPublisher(producerProps); // *<--- I construct producer with 5
element buffer but actually it is irrelevant.*
final ActorRef producerRef = stringSource
.map(msg -> msg.toString().toLowerCase())
.to(Sink.foreach(item -> {
logger.info("got message {}", item);
messages.add(item);
}))
.run(materializer);
final int requestedElementsCount = 100;
Thread thread = new Thread(() -> {
Stream.iterate(0, i -> i + 1)
.limit(requestedElementsCount)
.forEach(i -> {
producerRef.tell(new Result<>("Index " + i), noSender());
// sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
});
});
thread.start();
after starting the thread I await if *messages* get *requestedElementsCount*
elements but it never happens unless I add sleep to my thread above.
1) I cannot figure out why is that. first of all MessageProducer is active but
I can see in logs that demand is 0 and then buffer fills up and more messages
are denied. is this my system/jvm/etc?
I though that producer, message publisher and consumer runs on different
threads and there should be no problem with consuming 100 messages, map them
and put items to *messages* list.
here is a sample output
13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 1
13:21:04.029 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 0.
*// <---- even there are elements in buffer, demand is 0*
13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 2
13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is
0.
13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 3
13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is
0.
13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 4
13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is
0.
13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5
13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is
0.
...
13:34:02.449 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Denying
Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 9}. buffer size
is 5. *<-- **lots of messages are dropped*
13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Denying
Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 10}. buffer size
is 5.
13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE
c.f.d.a.s.p.actor.MessageProducer - Denying
Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 11}. buffer size
is 5.
...
13:24:42.818 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE
c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5 *// <---- now
subscriber wakes up and sends demand but there is no more messages than 5 :/*
13:21:04.052 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 4 items. demand is 4.
13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 1 items. demand is 2.
13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE
c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand is 3.
2) the other thing I don't understand is how to get value from sink if I change
it to
final ActorRef producerRef = stringSource
.map(msg -> msg.toString().toLowerCase())
.to(*Sink.head()*) *// <---------------------- from foreach to head*
.run(materializer);
head will return feature with first element encountered in stream, but since
run returns actorRef I don't see the way to get that future.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.