[ https://issues.apache.org/jira/browse/BEAM-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chun Yang updated BEAM-9192: ---------------------------- Affects Version/s: 2.16.0 2.19.0 > BigQuery IO on Dataflow runner fails (java.lang.ClassCastException) with > --experiment=beam_fn_api > ------------------------------------------------------------------------------------------------- > > Key: BEAM-9192 > URL: https://issues.apache.org/jira/browse/BEAM-9192 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 > Reporter: Bo Shi > Priority: Major > > {noformat} > python repro.py \ > --project=CHANGEME \ > --runner=DataflowRunner \ > --temp_location=gs://change-me/bshi/tmp \ > --staging_location=gs://change-me/bshi/stg \ > --experiment=beam_fn_api > --save_main_function > {noformat} > The same repro code works with --runner=Direct. On Dataflow, the error is > {noformat} > java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B > cannot be cast to org.apache.beam.sdk.values.KV > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:87) > at > org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService$DeferredInboundDataClient.awaitCompletion(BeamFnDataGrpcService.java:134) > at > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.finish(RemoteGrpcPortReadOperation.java:83) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) > at > org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) > at > org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) > at > org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) > at > org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) > at > org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:195) > at > org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:123) > Suppressed: java.lang.IllegalStateException: Already closed. > at > org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:93) > at > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:220) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:91) > ... 6 more > Caused by: java.lang.ClassCastException: [B cannot be cast to > org.apache.beam.sdk.values.KV > at > org.apache.beam.runners.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:72) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.consumeOutput(RemoteGrpcPortReadOperation.java:103) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:78) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:138) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {noformat} > > {code:python} > import logging > import sys > import apache_beam > import apache_beam.io.gcp.bigquery_file_loads > def main(): > logging.getLogger().setLevel(logging.DEBUG) > options = > apache_beam.options.pipeline_options.PipelineOptions(flags=sys.argv) > with apache_beam.Pipeline(options=options) as p: > ( > p > | apache_beam.Create( > [ > {"some_str": "hello", "some_int": 1,}, > {"some_str": "world", "some_int": 2,}, > ] > ) > | apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads( > options.view_as( > apache_beam.options.pipeline_options.GoogleCloudOptions > ).project > + ":bo_test.flow", > > create_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED, > > write_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.WRITE_TRUNCATE, > schema={ > "fields": [ > {"name": "some_str", "type": "STRING"}, > {"name": "some_int", "type": "INTEGER"}, > ] > }, > ) > ) > if __name__ == "__main__": > main() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)