damccorm opened a new issue, #19998:
URL: https://github.com/apache/beam/issues/19998
```
python repro.py \
--project=CHANGEME \
--runner=DataflowRunner \
--temp_location=gs://change-me/bshi/tmp
\
--staging_location=gs://change-me/bshi/stg \
--experiment=beam_fn_api
--save_main_function
```
The same repro code works with \--runner=Direct. On Dataflow, the error is
```
java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B
cannot be cast to org.apache.beam.sdk.values.KV
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:87)
at
org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService$DeferredInboundDataClient.awaitCompletion(BeamFnDataGrpcService.java:134)
at
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.finish(RemoteGrpcPortReadOperation.java:83)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
at
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
at
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
at
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
at
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
at
org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:195)
at
org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:123)
Suppressed:
java.lang.IllegalStateException: Already closed.
at
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:93)
at
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:220)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:91)
...
6 more
Caused by: java.lang.ClassCastException: [B cannot be cast to
org.apache.beam.sdk.values.KV
at
org.apache.beam.runners.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:72)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.consumeOutput(RemoteGrpcPortReadOperation.java:103)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:78)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
at
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:138)
at
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at
java.lang.Thread.run(Thread.java:748)
```
```
import logging
import sys
import apache_beam
import apache_beam.io.gcp.bigquery_file_loads
def
main():
logging.getLogger().setLevel(logging.DEBUG)
options =
apache_beam.options.pipeline_options.PipelineOptions(flags=sys.argv)
with apache_beam.Pipeline(options=options) as p:
(
p
| apache_beam.Create(
[
{"some_str": "hello", "some_int": 1,},
{"some_str":
"world", "some_int": 2,},
]
)
| apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads(
options.view_as(
apache_beam.options.pipeline_options.GoogleCloudOptions
).project
+ ":bo_test.flow",
create_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.WRITE_TRUNCATE,
schema={
"fields": [
{"name": "some_str",
"type": "STRING"},
{"name": "some_int", "type": "INTEGER"},
]
},
)
)
if __name__ == "__main__":
main()
```
Imported from Jira
[BEAM-9192](https://issues.apache.org/jira/browse/BEAM-9192). Original Jira may
contain additional context.
Reported by: bshi.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]