Carsten Krebs created BEAM-3271:
-----------------------------------

             Summary: BigQueryIO withFailedInsertRetryPolicy is endlessly 
retrying "invalid" rows
                 Key: BEAM-3271
                 URL: https://issues.apache.org/jira/browse/BEAM-3271
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-gcp
    Affects Versions: 2.1.0
         Environment: Google Cloud Dataflow
            Reporter: Carsten Krebs
            Assignee: Chamikara Jayalath


Using the InsertRetryPolicy.retryTransientErrors() on streaming data into a 
BigQuery table is endlessly retrying "invalid" rows.

To quote Eugene Kirpichov [~kirpichov]
bq. Upon talking to the BigQuery team, it became clear that this is indeed a 
bug in BigQueryIO. This error is not reported via InsertErrors because the 
InsertAll request specifies the table once rather than per row, and the table 
is invalid, so all rows in the batch are invalid. Beam should handle this.

{code:title=Code Snippet:|language=java}
p.apply(BigQueryIO.writeTableRows()
                .to(new DatePartitionedTableSpecifier(tableReference, "tracking 
data"))
                .withSchema(schema)
                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                // write all failed inserts to a DMQ
                .getFailedInserts().apply(MapElements.via(new 
SimpleFunction<TableRow, PubsubMessage>() {
public PubsubMessage apply(final TableRow _row) {
try {
return new PubsubMessage(JacksonFactory.getDefaultInstance().toByteArray(_row),
                        Collections.<String, String>emptyMap());
            } catch (IOException e) {
throw new RuntimeException("failed to write to DMQ", e);
            }
        }
})).apply(PubsubIO.writeMessages().to("projects/gameduell-bits-bigquery-poc/topics/dmq"));
{code}

{code:title=Exception Trace:}
(1a04bdb0d43aca9c): java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
        
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
    "reason" : "invalid"
  } ],
  "message" : "The destination table's partition rum$20170925 is outside the 
allowed bounds. You can only stream to partitions within 31 days in the past 
and 16 days in the future relative to the current date.",
  "status" : "INVALID_ARGUMENT"
}
        
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720)
        
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to