Hi Robert, I found this sample test with Timer on processing time. >From the error, I assume there may be is a problem what are you asserting in your PAssert.
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L3633-L3665 I ran it locally and it runs fine. -Regards Darshan On Mon, May 11, 2020 at 5:28 PM <[email protected]> wrote: > I have a BEAM DoFn that I’m attempting to unit test. It involves using a > timer based on *processing time* and I’ve not managed to get it to fire. > The relevant code excerpts are as follows: > > > > @TimerId("timer") > private final TimerSpec timer = TimerSpecs.*timer*(TimeDomain. > *PROCESSING_TIME*); > > > > @ProcessElement > public void process(@TimerId("timer") Timer timer) { > // Set a processing time timer to fire in 5 seconds so we can poll > BigQuery > timer.offset(Duration.*standardSeconds*(5)).setRelative(); > } > > > > @OnTimer("timer") > public void onTimer() { > System.*out*.println("In onTimer"); > > > > When I use a TestPipeline with an appropriate PAssert, it always results > in the following exception: > > > > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.util.NoSuchElementException > > > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) > > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) > > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) > > at > org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350) > > at > org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331) > > at > com.nwm.foundry.atomic.AtomicCommitFnTest.shouldGenerateCorrectEvent(AtomicCommitFnTest.java:28) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at > org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319) > > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > > Caused by: java.util.NoSuchElementException > > at java.util.ArrayList$Itr.next(ArrayList.java:862) > > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:302) > > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:254) > > at > org.apache.beam.sdk.testing.PAssert$SingletonCheckerDoFn.processElement(PAssert.java:1417) > > > > Swapping the timer for an EVENT_TIME timer works fine. > > > > Is there a trick I’m missing here? > > > > Kind regards, > > > > Rob > > > > > > *Robert Butcher* > > *Technical Architect | Foundry/SRS | NatWest Markets* > > WeWork, 10 Devonshire Square, London, EC2M 4AE > > Mobile +44 (0) 7414 730866 > > > > This email is classified as *CONFIDENTIAL* unless otherwise stated. > > > > This communication and any attachments are confidential and intended > solely for the addressee. If you are not the intended recipient please > advise us immediately and delete it. Unless specifically stated in the > message or otherwise indicated, you may not duplicate, redistribute or > forward this message and any attachments are not intended for distribution > to, or use by any person or entity in any jurisdiction or country where > such distribution or use would be contrary to local law or regulation. > NatWest Markets Plc or any affiliated entity ("NatWest Markets") accepts > no responsibility for any changes made to this message after it was sent. > Unless otherwise specifically indicated, the contents of this > communication and its attachments are for information purposes only and > should not be regarded as an offer or solicitation to buy or sell a product > or service, confirmation of any transaction, a valuation, indicative price > or an official statement. Trading desks may have a position or interest > that is inconsistent with any views expressed in this message. In > evaluating the information contained in this message, you should know that > it could have been previously provided to other clients and/or internal > NatWest Markets personnel, who could have already acted on it. > NatWest Markets cannot provide absolute assurances that all electronic > communications (sent or received) are secure, error free, not corrupted, > incomplete or virus free and/or that they will not be lost, mis-delivered, > destroyed, delayed or intercepted/decrypted by others. Therefore NatWest > Markets disclaims all liability with regards to electronic communications > (and the contents therein) if they are corrupted, lost destroyed, delayed, > incomplete, mis-delivered, intercepted, decrypted or otherwise > misappropriated by others. > Any electronic communication that is conducted within or through NatWest > Markets systems will be subject to being archived, monitored and produced > to regulators and in litigation in accordance with NatWest Markets’ policy > and local laws, rules and regulations. Unless expressly prohibited by local > law, electronic communications may be archived in countries other than the > country in which you are located, and may be treated in accordance with the > laws and regulations of the country of each individual included in the > entire chain. > Copyright NatWest Markets Plc. All rights reserved. See > https://www.nwm.com/disclaimer for further risk disclosure. >
