Here is my really hacky solution to the execution bounds problem:
import java.lang.reflect.Field
import akka.actor.ActorSystem
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.testkit.{TestKit, TestProbe}
import scala.concurrent.duration.FiniteDuration
/**
* Really hacky stuff to workaround limitations in Akka TestKit.
*/
trait AkkaHacker {
this: TestKit =>
/**
* Creates a new [[TestPublisher.Probe]] that shares the execution
bounds with the enclosing [[TestKit]].
*/
def pubProbe[T](initialPendingRequests: Long = 0)(implicit system:
ActorSystem): TestPublisher.Probe[T] = {
val probe = TestPublisher.probe[T](initialPendingRequests)
probe.innerProbe = new SharedTimeFactorProbe(system)
probe
}
/**
* Creates a new [[TestSubscriber.Probe]] that shares the execution
bounds with the enclosing [[TestKit]].
*/
def subProbe[T]()(implicit system: ActorSystem): TestSubscriber.Probe[T]
= {
val probe = TestSubscriber.probe[T]()
probe.innerProbe = new SharedTimeFactorProbe(system)
probe
}
/**
* Creates a [[TestProbe]] that shares the execution bounds with the
enclosing [[TestKit]].
*
* @param system the actor system for the probe
*/
class SharedTimeFactorProbe(system: ActorSystem) extends
TestProbe(system) {
override def remaining = AkkaHacker.this.remaining
override def remainingOr(duration: FiniteDuration) =
AkkaHacker.this.remainingOr(duration)
override def remainingOrDefault: FiniteDuration =
AkkaHacker.this.remainingOrDefault
override def within[T](min: FiniteDuration, max: FiniteDuration)(f: =>
T): T = AkkaHacker.this.within(min, max)(f)
override def within[T](max: FiniteDuration)(f: => T): T =
AkkaHacker.this.within(max)(f)
}
trait InnerProbe {
protected def outerProbe: Any
protected def probeField: Field = {
// Need the declared class not the runtime class.
val outerProbeClass =
this.getClass.getDeclaredMethod("outerProbe").getReturnType
val field = outerProbeClass.getDeclaredFields.find(_.getType ==
classOf[TestProbe]).get
field.setAccessible(true)
field
}
def innerProbe: TestProbe =
probeField.get(outerProbe).asInstanceOf[TestProbe]
def innerProbe_=(probe: TestProbe) = probeField.set(outerProbe, probe)
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T =
innerProbe.within(min, max)(f)
def within[T](max: FiniteDuration)(f: => T): T =
innerProbe.within(max)(f)
}
implicit class RichPubProbe[I](override val outerProbe:
TestPublisher.ManualProbe[I]) extends InnerProbe
implicit class RichSubProbe[I](override val outerProbe:
TestSubscriber.ManualProbe[I]) extends InnerProbe
}
This is definitely going to come back and bite me but oh well at least it
seems to work for now (e.g. this doesn't restore the end time back once we
leave the within block but too bad...).
Now in my test I could do something along the lines of:
within(1.second) {
sourceProbe.expectNoMsg()
sinkProbe.expectNoMsg()
expectNoMsg()
}
And they all share the timeout.
On Wednesday, 31 August 2016 11:04:30 UTC+12, Jason Steenstra-Pickens wrote:
>
> Hi,
>
> I'm trying to use the Probes in *StreamTestKit* but I have hit a few
> issues, specifically relating to the various probes.
>
> The main one is that for all types of probes (*Publisher*/*Subscriber*
> *ManualProbe*/*Probe*) the actual *TestProbe* is encapsulated as a
> private member and most of the functions just delegate to this inner probe.
> The problem is that there is no way to set the *end* time which would
> normally be done via the *within* construct.
>
> The second issue is the combination of the fact that
> *TestPublisher.Probe#ensureSubscription* does not return the underlying
> *Subscription* and this class does not override the *expectSubscription* to
> initialise the lazy val. The consequence of this is that you cannot get the
> *Subscription* and then use any of the other functions that internally
> use the lazy val otherwise it will expect two subscriptions.
>
> Lastly, a relatively minor but frustrating issue is that
> *ManualProbe#expectSubscription* returns a *PublisherProbeSubscription* which
> has some useful functions for doing things on a per-subscription basis
> rather than for the whole Publisher. However *StreamTestKit* is private
> so even though *PublisherProbeSubscription* escapes out, you can't refer
> to its type so writing helper functions etc is not possible.
>
> I was just hoping to raise these and get some feedback around whether
> anyone else had noticed these or perhaps I'm just using it wrong.
>
> I really like Akka Streams and I'm hoping to get into a better position to
> start contributing back. Keep up the good work!
>
>
> Cheers,
> Jason
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.