[ 
https://issues.apache.org/jira/browse/BEAM-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Minbo Bae updated BEAM-12618:
-----------------------------
    Description: 
if a pipeline has a transform with a sideinput and you call `pipeline.run()` 
two times to create Dataflow jobs, the second job will see a runtime error 
below in Dataflow worker.
{code}
Error message from worker: java.lang.RuntimeException: 
java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as 
a reader but was AvroByteReader.
        
org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
        
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
        
org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
        
org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
        
baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
        
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports 
IsmReader as a reader but was AvroByteReader.
        
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
        
org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
        
org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
        
org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
        
org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
        
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
        
org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
        
org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
        
baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
        
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
        
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
        
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
        
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
        
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
        
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
        
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
        
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)

{code}
You can reproduce this issue with [this sample 
pipeline|[https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]].

  was:
if a pipeline has a transform with a sideinput and you call `pipeline.run()` 
two times to create Dataflow jobs, the second job will see a runtime error 
below in Dataflow worker.
{code:java}
Error message from worker: java.lang.RuntimeException: 
java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as 
a reader but was AvroByteReader.Error message from worker: 
java.lang.RuntimeException: java.lang.IllegalStateException: IsmSideInputReader 
only supports IsmReader as a reader but was AvroByteReader. 
org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
 
org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
 
org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
 
baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)Caused
 by: java.lang.IllegalStateException: IsmSideInputReader only supports 
IsmReader as a reader but was AvroByteReader. 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
 
org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
 
org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
 
org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
 
org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
 
org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
 
org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
 
baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source) 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
 
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
 
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
 
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
 
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
 
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
 
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 java.base/java.lang.Thread.run(Thread.java:834){code}
You can reproduce this issue with [this sample 
pipeline|[https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]].


> Check state failed at IsmSideInputReader.createReadersFromSources
> -----------------------------------------------------------------
>
>                 Key: BEAM-12618
>                 URL: https://issues.apache.org/jira/browse/BEAM-12618
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Minbo Bae
>            Priority: P2
>
> if a pipeline has a transform with a sideinput and you call `pipeline.run()` 
> two times to create Dataflow jobs, the second job will see a runtime error 
> below in Dataflow worker.
> {code}
> Error message from worker: java.lang.RuntimeException: 
> java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader 
> as a reader but was AvroByteReader.
>         
> org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
>         
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>         
> org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>         
> org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>         
> baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>         
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
> Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports 
> IsmReader as a reader but was AvroByteReader.
>         
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>         
> org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
>         
> org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
>         
> org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
>         
> org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
>         
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>         
> org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>         
> org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>         
> baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>         
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
>         
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>         
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
>         
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
>         
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>         
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>         
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>         
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>         
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
>         
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
>         
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
>         
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>         
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>         
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>         
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>         java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> You can reproduce this issue with [this sample 
> pipeline|[https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to