Jonny Daenen created BEAM-12721:
-----------------------------------

             Summary: NullPointerException when streaming to BigQuery
                 Key: BEAM-12721
                 URL: https://issues.apache.org/jira/browse/BEAM-12721
             Project: Beam
          Issue Type: Bug
          Components: io-java-gcp
    Affects Versions: 2.28.0
            Reporter: Jonny Daenen


Setup:
 * streaming dataflow pipeline
 * bigquery sink
 * retry is set to transient errors

*Observed errors* on Dataflow job level (not on worker level):

Stacktrace:
```
java.lang.RuntimeException: java.lang.NullPointerException
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:954)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
Caused by: java.lang.NullPointerException
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.recordError(BigQueryServicesImpl.java:1020)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:880)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
```
 

*Location of issue:*

org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl

Line 1020: 

int errorCode = ((GoogleJsonResponseException) e).getDetails().getCode();


The same code also seems to give *connection resets* hundreds of times per day:
```
java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:954)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
Caused by: javax.net.ssl.SSLException: Connection reset
        at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127)
        at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350)
        at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293)
        at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288)
        at 
java.base/sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1581)
        at 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:979)
        at 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
        at 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
        at 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
        at 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552)
        at 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609)
        at 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
        at 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510)
        at 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:107)
        at com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274)
        at 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40)
        at 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232)
        at 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137)
        at 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252)
        at com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369)
        at 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48)
        at com.google.api.client.json.JsonParser.parse(JsonParser.java:363)
        at com.google.api.client.json.JsonParser.parse(JsonParser.java:335)
        at 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79)
        at 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73)
        at 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456)
        at 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878)
        at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
        Suppressed: java.net.SocketException: Broken pipe (Write failed)
                at java.base/java.net.SocketOutputStream.socketWrite0(Native 
Method)
                at 
java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
                at 
java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
                at 
java.base/sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
                at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:381)
                ... 34 more
Caused by: java.net.SocketException: Connection reset
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
        at 
java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476)
        at 
java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
        at 
java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
        at 
java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
        at 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963)
        ... 30 more
```


*Open questions*

- does this error lead to data loss? (it manifests as job error rather than 
worker error in dataflow)
- is there a setting that can mitigate this?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to