I fixed it by rewriting my stage to use FanInShape2 instead (must have been 
something wrong with my shape implementation), but now my graph seems to 
deadlock after a few stage operations.

By putting breakpoints into my onPull() & onPush() implementations, I can 
see that both outlets leading into requestContextBuilderStage are pushed, 
but my breakpoint in RequestContextFactory.apply() never gets hit, and when 
I do an break-all in the debugger, it seems like none of my code is on any 
of the stacks. Any ideas?

On Monday, September 18, 2017 at 7:39:50 PM UTC-7, Bwmat wrote:
>
> I'm using akka-stream_2.11-2.5.4.jar, and I'm getting an exception with 
> the following stack trace when building my graph:
>
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>     at 
> akka.stream.impl.AtomicTraversalBuilder.assign(TraversalBuilder.scala:550)
>     at 
> akka.stream.impl.CompositeTraversalBuilder.assign(TraversalBuilder.scala:1189)
>     at 
> akka.stream.impl.CompositeTraversalBuilder.wire(TraversalBuilder.scala:1299)
>     at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1177)
>     at 
> akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1323)
>     at 
> akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1413)
>     at 
> akka.stream.javadsl.GraphDSL$Builder$ForwardOps.toInlet(Graph.scala:443)
>     at 
> com.simba.cloud.core.data.experimental.akka.Test$TopLevelBuilder.apply(Test.java:164)
>     at 
> com.simba.cloud.core.data.experimental.akka.Test$TopLevelBuilder.apply(Test.java:1)
>     at 
> akka.stream.javadsl.GraphCreate$$anonfun$create$4$$anonfun$apply$2.apply(GraphCreate.scala:34)
>     at 
> akka.stream.javadsl.GraphCreate$$anonfun$create$4$$anonfun$apply$2.apply(GraphCreate.scala:34)
>     at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:43)
>     at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:1164)
>     at akka.stream.javadsl.GraphCreate.create(GraphCreate.scala:34)
>     at akka.stream.javadsl.GraphDSL.create(Graph.scala)
>     at 
> com.simba.cloud.core.data.experimental.akka.Test.fetchRows(Test.java:234)
>     at com.simba.cloud.core.data.experimental.akka.Test.main(Test.java:639)
>
> I don't have eclipse set up to debug scala, so I can't really make up what 
> the proximate reason for failure is.
>
> The line causing the failure is the line
>
> in_builder.from(requestSourceStage.m_requestsOut).toInlet(
> requestContextBuilderStage.in0());
>
> from
>
>
>         @Override
>         public ClosedShape apply(
>             Builder<Pair<Sink<ResponseEvent<NodeT>, NotUsed>, 
> SinkQueueWithCancel<Iterator<ICLRow>>>> in_builder,
>             FanOutShape2<ResponseContext<NodeT>, ResponseEvent<NodeT>, 
> Pair<ResponseStatus, RequestContext<NodeT>>> in_errorChecker,
>             SinkShape<Iterator<ICLRow>> in_sink) throws Exception
>         {
>             final SourceShape<Iterator<Map<String, String>>> 
> preReqCallVariables = in_builder.add(getPreReqCallVariableSource());
>             final RequestSourceStageShape<NodeT> requestSourceStage = 
> in_builder.add(new RequestSourceStage<NodeT>(getRequestSource()));
>             final FanInShape2<Pair<CLDataRequest, ICLResponseListener<
> NodeT>>, ICLResponseListener<NodeT>, RequestContext<NodeT>> 
> requestContextBuilderStage =
>                 in_builder.add(ZipWith.create(new RequestContextFactory
> <>()));
>             final FanInShape2<RequestContext<NodeT>, Pair<ResponseStatus, 
> RequestContext<NodeT>>, RequestContext<NodeT>> retryCycleMergeStage = 
> in_builder.add(new RetryCycleMergeStage<NodeT>());
>             final FlowShape<RequestContext<NodeT>, RequestContext<NodeT>> 
> rateLimiterStage = in_builder.add(getRateLimiter()); 
>             final FlowShape<RequestContext<NodeT>, RequestContext<NodeT>> 
> authenticatorStage = in_builder.add(getAuthenticationFlow());
>             final FlowShape<RequestContext<NodeT>, ResponseContext<NodeT>> 
> responseStage =
>                 in_builder.add(Flow.<RequestContext<NodeT>>create()
>                     .map(reqCtxt -> Pair.create(createHttpRequest(reqCtxt.
> Request), reqCtxt))
>                     .via(getRequestToResponseFlow())
>                     .map(pair -> new ResponseContext<NodeT>(pair.second(), 
> pair.first().get(), getErrorHandler().create())));
>             
>             final UniformFanOutShape<ResponseEvent<NodeT>, ResponseEvent<
> NodeT>> routeResponseEventsStage = in_builder.add(Partition.create(2, 
> event -> event.getTarget() == ResponseEvent.Target.DATA_UNMARSHALLER ? 0 : 
> 1));
>             final FanOutShape2<ResponseEvent<NodeT>, Iterator<ICLRow>, 
> ICLResponseListener<NodeT>> unmarshallStage = in_builder.add(new 
> UnmarshallerStage<NodeT>(getUnmarshallerFactory()));
>             
>             in_builder.from(preReqCallVariables).toInlet(
> requestSourceStage.m_preReqCallVariablesIn);
>             in_builder.from(requestSourceStage.m_requestsOut).toInlet(
> requestContextBuilderStage.in0());
>             in_builder.from(requestContextBuilderStage.out()).toInlet(
> retryCycleMergeStage.in0());
>             in_builder.from(retryCycleMergeStage.out()).via(
> rateLimiterStage).via(authenticatorStage).via(responseStage).toInlet(
> in_errorChecker.in());
>             in_builder.from(in_errorChecker.out1()).toInlet(
> retryCycleMergeStage.in1());
>             in_builder.from(in_errorChecker.out0()).toInlet(
> routeResponseEventsStage.in());
>             in_builder.from(routeResponseEventsStage.out(0)).toInlet(
> unmarshallStage.in());
>             in_builder.from(routeResponseEventsStage.out(1)).toInlet(
> requestSourceStage.m_responseEventsIn);
>             in_builder.from(unmarshallStage.out1()).toInlet(
> requestContextBuilderStage.in1());
>             in_builder.from(unmarshallStage.out0()).to(in_sink);
>             
>             return ClosedShape.getInstance();
>         }
>
> in the 
> com.simba.cloud.core.data.experimental.akka.Test$TopLevelBuilder
> class
>
> How do I diagnose the cause of this?
>
> Thanks,
> Matthew w.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to