[
https://issues.apache.org/jira/browse/BEAM-5673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles resolved BEAM-5673.
-----------------------------------
Resolution: Not A Problem
Assignee: Kenneth Knowles
Fix Version/s: Not applicable
> View.asMap on non-KV PCollection fails at runtime, not
> construction/submission time
> -----------------------------------------------------------------------------------
>
> Key: BEAM-5673
> URL: https://issues.apache.org/jira/browse/BEAM-5673
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.6.0, 2.7.0
> Reporter: Przemyslaw Pastuszka
> Assignee: Kenneth Knowles
> Priority: Major
> Fix For: Not applicable
>
>
> I'm trying to write a ParDo, which will use both Timer and Side Input, but it
> crashes when I try to run it with {{beam-runners-direct-java}} with
> {{IllegalArgumentException}} on a line
> [https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L167],
> because there are actually two inputs to ParDo (main PCollection and side
> input), while only one is expected. It looks like a bug in an implementation.
>
> Here's the code that reproduces the issue:
> {code:java}
> public class TestCrashesForTimerAndSideInput {
> @Rule
> public final transient TestPipeline p = TestPipeline.create();
> private static class DoFnWithTimer extends DoFn<KV<String, String>,
> String> {
> @TimerId("t")
> private final TimerSpec tSpec =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> private final PCollectionView<Map<String, String>> sideInput;
> private DoFnWithTimer(PCollectionView<Map<String, String>> sideInput)
> {
> this.sideInput = sideInput;
> }
> @ProcessElement
> public void processElement(ProcessContext c, @TimerId("t") Timer t) {
> KV<String, String> element = c.element();
> c.output(element.getKey() + c.sideInput(sideInput).get(element));
> t.offset(Duration.standardSeconds(1)).setRelative();
> }
> @OnTimer("t")
> public void onTimerFire(OnTimerContext x) {
> x.output("Timer fired");
> }
> }
> @Test
> public void testCrashesForTimerAndSideInput() {
> ImmutableMap<String, String> sideData = ImmutableMap.<String,
> String>builder().
> put("x", "X").
> put("y", "Y").
> build();
> PCollectionView<Map<String, String>> sideInput =
> p.apply(Create.of(sideData)).apply(View.asMap());
> TestStream<String> testStream =
> TestStream.create(StringUtf8Coder.of()).
> addElements("x").
> advanceProcessingTime(Duration.standardSeconds(1)).
> addElements("y").
> advanceProcessingTime(Duration.standardSeconds(1)).
> advanceWatermarkToInfinity();
> PCollection<String> result = p.
> apply(testStream).
> apply(MapElements.into(kvs(strings(), strings())).via(v ->
> KV.of(v, v))).
> apply(ParDo.of(new
> DoFnWithTimer(sideInput)).withSideInputs(sideInput));
> PAssert.that(result).containsInAnyOrder("xX", "yY", "Timer fired");
> p.run();
> }
> }
> {code}
>
> and the error is:
> {code}
> java.lang.IllegalArgumentException: expected one element but was:
> <ParDo(DoFnWithTimer)/ParMultiDo(DoFnWithTimer)/To
> KeyedWorkItem/ParMultiDo(ToKeyedWorkItem).output [PCollection],
> View.AsMap/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization).output
> [PCollection]>
> at
> org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
> at
> org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
> at
> org.apache.beam.runners.direct.QuiescenceDriver.fireTimers(QuiescenceDriver.java:167)
> at
> org.apache.beam.runners.direct.QuiescenceDriver.drive(QuiescenceDriver.java:110)
> at
> org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$2.run(ExecutorServiceParallelExecutor.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)