Could this be a aka-streams bug? Should I open a git bug?
In-between I am using my own inferior throttle so I have a workaround
final static class Tick {}
public static <T> Flow<T,T, BoxedUnit> createFlow(final long throttleSecs) {
Flow<T, T, BoxedUnit> sourceflow = Flow.<T>create();
Flow<T,Pair<T,Tick>, BoxedUnit> flow = Flow.fromGraph(
GraphDSL.create(builder -> {
final FlowShape<T, T> source = builder.add(sourceflow);
Source<Tick, Cancellable> tickSource =
Source.tick(FiniteDuration.apply(0, "millis"),
FiniteDuration.apply(throttleSecs, "millis"), new Tick());
final FanInShape2<T, Tick, Pair<T, Tick>> zipper =
builder.add(Zip.create());
SourceShape<Tick> tickSourceShape = builder.add(tickSource);
builder.from(source).toInlet(zipper.in0());
builder.from(tickSourceShape).toInlet(zipper.in1());
return FlowShape.of(source.in(), zipper.out());
}));
return flow.map(Pair::first);
Am Mittwoch, 2. März 2016 15:12:00 UTC+1 schrieb [email protected]:
>
>
> Hi Endre,
> many thanks for wanting to help me!!
>
> If you run the code you will see that only "success : Pipeline 2" gets
> outputted to System.out.
>
> If I remove the Throttle everything works as expected.
>
> //Source<Object, BoxedUnit> pipeline = source.via(throttle).mapConcat(t ->
> t);
> Source<Object, BoxedUnit> pipeline = source.mapConcat(t -> t);
>
>
> Am I doing anything wrong or stupid?
>
> here is the code again:
>
> package test
>
>
>
> import akka.actor.ActorSystem;
> import akka.dispatch.Futures;
> import akka.japi.function.Function;
> import akka.stream.ActorMaterializer;
> import akka.stream.ActorMaterializerSettings;
> import akka.stream.Materializer;
> import akka.stream.Supervision;
> import akka.stream.javadsl.Flow;
> import akka.stream.javadsl.Sink;
> import akka.stream.javadsl.Source;
> import scala.Option;
> import scala.Tuple2;
> import scala.concurrent.Promise;
> import scala.concurrent.duration.FiniteDuration;
> import scala.runtime.BoxedUnit;
>
> import java.util.ArrayList;
> import java.util.Collection;
>
>
> public class MailThrottleTest {
> static ActorSystem system = ActorSystem.create("TestThrotteling");
>
> public static void main(String[] args) throws Exception{
> MailThrottleTest mailThrottleTest = new MailThrottleTest();
> Source<Object, BoxedUnit> pipe1=
> mailThrottleTest.createPipeLine("Pipeline 1");
> Source<Object, BoxedUnit> pipe2=
> mailThrottleTest.createPipeLine("Pipeline 2");
>
>
> final Function<Throwable, Supervision.Directive> decider = exc -> {
> return Supervision.restart();
> };
> final Materializer mat1 =
> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
> system);
> final Materializer mat2 =
> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
> system);
> pipe1.to(Sink.foreach(object -> {
> System.out.println("Got Object pipe 1 : "+object);
> })).run(mat1);
>
> pipe2.to(Sink.foreach(object -> {
> System.out.println("Got Object pipe 2 : "+object);
> })).run(mat2);
> system.awaitTermination();
> }
>
> public Source<Object, BoxedUnit> createPipeLine(final String name)
> throws Exception{
>
>
> final Promise<Option<Tuple2<Object, Collection<Object>>>> promise =
> Futures.promise();
> Source<Collection<Object>, BoxedUnit> source = Source.unfoldAsync(null,
> p -> {
> System.out.println("success : "+name);
> return promise.future();
> });
>
> Flow<Collection<Object>,Collection<Object>,BoxedUnit> throttle =
> Flow.<Collection<Object>>create().throttle(1,
> FiniteDuration.apply(2000, "millis"),1,
> new akka.stream.ThrottleMode.Shaping$());
>
> Source<Object, BoxedUnit> pipeline = source.via(throttle).mapConcat(t
> -> t);
>
> promise.success(Option.apply(new Tuple2(null, new ArrayList())));
> return pipeline;
>
> }
>
>
> }
>
>
>
>
>
>
>
>
>
> Am Mittwoch, 2. März 2016 13:43:21 UTC+1 schrieb drewhk:
>>
>> Hi John,
>>
>> Can you prepare a small reproducer? It might be a bug, but we can only be
>> sure if we see some code that exhibits the behavior.
>>
>> -Endre
>>
>> On Wed, Mar 2, 2016 at 1:40 PM, <[email protected]> wrote:
>>
>>> I am running up several instances of the an akka streams pipeline.
>>>
>>> In the pipeline there is a throttle stage (Flow.create.throttle(..))
>>> Now what is happing is that only the first pipeline works. The other
>>> pipelines all stall.
>>> Any Ideas?
>>> Many Greetings
>>> John
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.