[
https://issues.apache.org/jira/browse/BEAM-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381405#comment-17381405
]
Minbo Bae commented on BEAM-12618:
----------------------------------
`use_indexed_format` is added at
[DataflowPipelineTranslator.StepTranslator.addOutput|https://github.com/apache/beam/blob/v2.31.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L754]
if the PCollection is in
[pcollectionsRequiringIndexedFormat|https://github.com/apache/beam/blob/v2.31.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L234].
PCollection is added to `pcollectionsRequiringIndexedFormat` when a view is
overridden with
[BatchViewOverrides|https://github.com/apache/beam/blob/v2.31.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1103],
but I guess the override only happens at the first run, but no in the second
run as the graph already had replacement transforms. So, the pipeline graph is
the same but has different at translating to Dataflow Service API Job entity.
I guess using a copy of the original pipeline graph in `DataflowRunner.run()`
would fix this issue.
> Check state failed at IsmSideInputReader.createReadersFromSources
> -----------------------------------------------------------------
>
> Key: BEAM-12618
> URL: https://issues.apache.org/jira/browse/BEAM-12618
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Minbo Bae
> Priority: P2
>
> if a pipeline has a transform with a sideinput and you call `pipeline.run()`
> two times to create Dataflow jobs, the second job will see a runtime error
> below in Dataflow worker.
> {code:java}
> Error message from worker: java.lang.RuntimeException:
> java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader
> as a reader but was AvroByteReader.
>
> org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>
> org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>
> org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>
> baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
> Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports
> IsmReader as a reader but was AvroByteReader.
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>
> org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
>
> org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
>
> org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
>
> org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>
> org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>
> org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>
> baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
>
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
>
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
>
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
>
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> You can reproduce this issue with [this sample
> pipeline|https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)