Carsten Krebs created BEAM-3271:
-----------------------------------
Summary: BigQueryIO withFailedInsertRetryPolicy is endlessly
retrying "invalid" rows
Key: BEAM-3271
URL: https://issues.apache.org/jira/browse/BEAM-3271
Project: Beam
Issue Type: Bug
Components: sdk-java-gcp
Affects Versions: 2.1.0
Environment: Google Cloud Dataflow
Reporter: Carsten Krebs
Assignee: Chamikara Jayalath
Using the InsertRetryPolicy.retryTransientErrors() on streaming data into a
BigQuery table is endlessly retrying "invalid" rows.
To quote Eugene Kirpichov [~kirpichov]
bq. Upon talking to the BigQuery team, it became clear that this is indeed a
bug in BigQueryIO. This error is not reported via InsertErrors because the
InsertAll request specifies the table once rather than per row, and the table
is invalid, so all rows in the batch are invalid. Beam should handle this.
{code:title=Code Snippet:|language=java}
p.apply(BigQueryIO.writeTableRows()
.to(new DatePartitionedTableSpecifier(tableReference, "tracking
data"))
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
// write all failed inserts to a DMQ
.getFailedInserts().apply(MapElements.via(new
SimpleFunction<TableRow, PubsubMessage>() {
public PubsubMessage apply(final TableRow _row) {
try {
return new PubsubMessage(JacksonFactory.getDefaultInstance().toByteArray(_row),
Collections.<String, String>emptyMap());
} catch (IOException e) {
throw new RuntimeException("failed to write to DMQ", e);
}
}
})).apply(PubsubIO.writeMessages().to("projects/gameduell-bits-bigquery-poc/topics/dmq"));
{code}
{code:title=Exception Trace:}
(1a04bdb0d43aca9c): java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"reason" : "invalid"
} ],
"message" : "The destination table's partition rum$20170925 is outside the
allowed bounds. You can only stream to partitions within 31 days in the past
and 16 days in the future relative to the current date.",
"status" : "INVALID_ARGUMENT"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)