I have a BEAM DoFn that I'm attempting to unit test. It involves using a timer
based on processing time and I've not managed to get it to fire. The relevant
code excerpts are as follows:
@TimerId("timer")
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
public void process(@TimerId("timer") Timer timer) {
// Set a processing time timer to fire in 5 seconds so we can poll BigQuery
timer.offset(Duration.standardSeconds(5)).setRelative();
}
@OnTimer("timer")
public void onTimer() {
System.out.println("In onTimer");
When I use a TestPipeline with an appropriate PAssert, it always results in the
following exception:
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.util.NoSuchElementException
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
at
com.nwm.foundry.atomic.AtomicCommitFnTest.shouldGenerateCorrectEvent(AtomicCommitFnTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:862)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:302)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:254)
at
org.apache.beam.sdk.testing.PAssert$SingletonCheckerDoFn.processElement(PAssert.java:1417)
Swapping the timer for an EVENT_TIME timer works fine.
Is there a trick I'm missing here?
Kind regards,
Rob
Robert Butcher
Technical Architect | Foundry/SRS | NatWest Markets
WeWork, 10 Devonshire Square, London, EC2M 4AE
Mobile +44 (0) 7414 730866
This email is classified as CONFIDENTIAL unless otherwise stated.
This communication and any attachments are confidential and intended solely for
the addressee. If you are not the intended recipient please advise us
immediately and delete it. Unless specifically stated in the message or
otherwise indicated, you may not duplicate, redistribute or forward this
message and any attachments are not intended for distribution to, or use by any
person or entity in any jurisdiction or country where such distribution or use
would be contrary to local law or regulation. NatWest Markets Plc or any
affiliated entity ("NatWest Markets") accepts no responsibility for any changes
made to this message after it was sent.
Unless otherwise specifically indicated, the contents of this communication and
its attachments are for information purposes only and should not be regarded as
an offer or solicitation to buy or sell a product or service, confirmation of
any transaction, a valuation, indicative price or an official statement.
Trading desks may have a position or interest that is inconsistent with any
views expressed in this message. In evaluating the information contained in
this message, you should know that it could have been previously provided to
other clients and/or internal NatWest Markets personnel, who could have already
acted on it.
NatWest Markets cannot provide absolute assurances that all electronic
communications (sent or received) are secure, error free, not corrupted,
incomplete or virus free and/or that they will not be lost, mis-delivered,
destroyed, delayed or intercepted/decrypted by others. Therefore NatWest
Markets disclaims all liability with regards to electronic communications (and
the contents therein) if they are corrupted, lost destroyed, delayed,
incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated
by others.
Any electronic communication that is conducted within or through NatWest
Markets systems will be subject to being archived, monitored and produced to
regulators and in litigation in accordance with NatWest Markets’ policy and
local laws, rules and regulations. Unless expressly prohibited by local law,
electronic communications may be archived in countries other than the country
in which you are located, and may be treated in accordance with the laws and
regulations of the country of each individual included in the entire chain.
Copyright NatWest Markets Plc. All rights reserved. See
https://www.nwm.com/disclaimer for further risk disclosure.