[
https://issues.apache.org/jira/browse/BEAM-3200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262089#comment-16262089
]
AJ commented on BEAM-3200:
--------------------------
Here's a bigger stack trace
{noformat}
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84)
org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180)
com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:55)
com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:37)
com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117)
com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74)
com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1069)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{noformat}
> Streaming Pipeline throws RuntimeException when using DynamicDestinations and
> Method.FILE_LOADS
> -----------------------------------------------------------------------------------------------
>
> Key: BEAM-3200
> URL: https://issues.apache.org/jira/browse/BEAM-3200
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-gcp
> Affects Versions: 2.2.0
> Reporter: AJ
> Assignee: Reuven Lax
> Priority: Critical
>
> I am trying to use Method.FILE_LOADS for loading data into BQ in my streaming
> pipeline using RC3 release of 2.2.0. I am writing to around 500 tables using
> DynamicDestinations and I am also using
> withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED). Everything works
> fine when the first time bigquery load jobs get triggered. But on subsequent
> triggers pipeline throws a RuntimeException about table not found even though
> I created the pipeline with CreateDisposition.CREATE_IF_NEEDED. The exact
> exception is:
> {code}
> java.lang.RuntimeException: Failed to create load job with id prefix
> 717aed9ed1ef4aa7a616e1132f8b7f6d_a0928cae3d670b32f01ab2d9fe5cc0ee_00001_00001,
> reached max retries: 3, last failed load job: {
> "configuration" : {
> "load" : {
> "createDisposition" : "CREATE_NEVER",
> "destinationTable" : {
> "datasetId" : ...,
> "projectId" : ...,
> "tableId" : ....
> },
> "errors" : [ }
> "message" : "Not found: Table ....,
> "reason" : "notFound"
> } ],
> {code}
> My theory is all the subsequent load jobs get trigged using CREATE_NEVER
> disposition and
> this might be due to
> https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L140
> When using DynamicDestinations all the destination tables might not be known
> during the first trigger and hence the pipeline's create disposition should
> be respected.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)