Hi!I am trying to get a watch transform that always read the whole file if
it was changed at all. I can get this working in Beam 2.8 but get the
following error when using 2.9:
java.lang.IllegalArgumentException:
org.apache.beam.sdk.transforms.Watch$WatchGrowthFn, @ProcessElement
process(ProcessContext, GrowthTracker): Has tracker type
Watch.GrowthTracker<OutputT, KeyT, TerminationStateT>, but the DoFn's
tracker type must be of type RestrictionTracker.
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:1507)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:1512)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.verifySplittableMethods(DoFnSignatures.java:593)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:472)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:140)
at java.util.HashMap.computeIfAbsent(HashMap.java:1126)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:140)
at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:546)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:393)
at org.apache.beam.sdk.transforms.Watch$Growth.expand(Watch.java:689)
at org.apache.beam.sdk.transforms.Watch$Growth.expand(Watch.java:157)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at org.apache.beam.sdk.io.FileIO$MatchAll.expand(FileIO.java:614)
at org.apache.beam.sdk.io.FileIO$MatchAll.expand(FileIO.java:572)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
at co.motherbrain.cyrano.LogReader.expand(LogReader.java:93)
at co.motherbrain.cyrano.LogReader.expand(LogReader.java:35)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:167)
at co.motherbrain.cyrano.LogReaderTest.testReadOnce(LogReaderTest.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Is there something that has changed between the versions that I need to
know or is this a bug?
I get the same error when using FileIO.matchAll().continuously() btw so it
looks like a bug but I though I should check before filing as this is not
something I’m super familiar with.
Br,
Vilhelm von Ehrenheim