[ 
https://issues.apache.org/jira/browse/BEAM-2404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080563#comment-16080563
 ] 

Andre commented on BEAM-2404:
-----------------------------

Sorry guys, got distracted with other stuff. It looks like the problem is 
introduced by the write step after not reading anything.

For example the following code runs fine when data is read (WHERE removed) but 
fails with the exception below otherwise.

{code:java}
PCollection<TableRow> rows = p.apply("ReadFromBQ", BigQueryIO.read()
   .fromQuery("SELECT * FROM [project:dataset.table] WHERE 1 = 
2").withoutResultFlattening()); 
                
rows.apply("WriteToBQ", BigQueryIO.writeTableRows()
   .to(targetTable).withSchema(mySchema)
   .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}


{code:java}
SEVERE: java.lang.NullPointerException
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.NullPointerException
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
        at com.project.MyClass.main(MyClass.java:128)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)
{code}


Now if I add a windowing strategy, the code doesn't fail anymore but never 
finishes even though no data is being read.

{code:java}
rows
        .apply("AddTimestamp", ParDo.of(new OrderAddTimestampDoFn()))
        .apply("WindowDaily", Window.<TableRow>into(CalendarWindows.days(1)))
        .apply("WriteToBQ", BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, 
TableDestination>() {
                @Override
                public TableDestination apply(ValueInSingleWindow<TableRow> 
value) {
                        String dayString = 
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(((IntervalWindow)
 value.getWindow()).start());
                        TableDestination td = new TableDestination(targetTable 
+ "$" + dayString, null);
                        return td;
                }
        }).withSchema(mySchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}



> BigQueryIO reading stalls if no data is returned by query
> ---------------------------------------------------------
>
>                 Key: BEAM-2404
>                 URL: https://issues.apache.org/jira/browse/BEAM-2404
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 2.0.0
>            Reporter: Andre
>            Assignee: Stephen Sisk
>             Fix For: Not applicable
>
>
> When running a BigQueryIO query that doesn't return any rows (e.g. nothing 
> has changed in a delta job) the job seems to stall and nothing happens as no 
> temp files are being written which I think might be what it is waiting for. 
> Just adding one row to the source table will make the job run through 
> successfully.
> Code:
> {code:java}
> PCollection <TableRow> rows = p.apply("ReadFromBQ",
>  BigQueryIO.read()
>  .fromQuery("SELECT * FROM `myproject.dataset.table`")
>  .withoutResultFlattening().usingStandardSql());
> {code}
>                       
> Log:
> {code:java}           
> Jun 02, 2017 9:00:36 AM 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl 
> startJob
> INFO: Started BigQuery job: {jobId=beam_job_batch-query, 
> projectId=my-project}.
> bq show -j --format=prettyjson --project_id=my-project beam_job_batch-query
> Jun 02, 2017 9:03:11 AM 
> org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase executeExtract
> INFO: Starting BigQuery extract job: beam_job_batch-extract
> Jun 02, 2017 9:03:12 AM 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl 
> startJob
> INFO: Started BigQuery job: {jobId=beam_job_batch-extract, 
> projectId=my-project}.
> bq show -j --format=prettyjson --project_id=my-project beam_job_batch-extract
> Jun 02, 2017 9:04:06 AM 
> org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase executeExtract
> INFO: BigQuery extract job completed: beam_job_batch-extract
> Jun 02, 2017 9:04:08 AM org.apache.beam.sdk.io.FileBasedSource 
> expandFilePattern
> INFO: Matched 1 files for pattern 
> gs://my-bucket/tmp/BigQueryExtractTemp/ff594d003c6440a1ad84b9e02858b5c6/000000000000.avro
> Jun 02, 2017 9:04:09 AM org.apache.beam.sdk.io.FileBasedSource 
> getEstimatedSizeBytes
> INFO: Filepattern 
> gs://my-bucket/tmp/BigQueryExtractTemp/ff594d003c6440a1ad84b9e02858b5c6/000000000000.avro
>  matched 1 files with total size 9750
> {code}        



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to