kennknowles opened a new issue, #18984:
URL: https://github.com/apache/beam/issues/18984

   *From: 
https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query
 :*
   
   I've been playing with the Beam SQL DSL and I'm unable to use the output 
from a query without providing a code that's aware of the output schema 
manually. Can I infer the output schema rather than hardcoding it?
   
   Neither the walkthrough or the examples actually use the output from a 
query. I'm using Scio rather than the plain Java API to keep the code 
relatively readable and concise, I don't think that makes a difference for this 
question.
   
   Here's an example of what I mean.
   
   Given an input schema inSchema and some data source that is mapped onto a 
Row as follows: (in this example, Avro-based, but again, I don't think that 
matters):
   
   ```
   
   sc.avroFile[Foo](args("input"))
      .map(fooToRow)
      .setCoder(inSchema.getRowCoder)
      .applyTransform(SqlTransform.query("SELECT
   COUNT(1) FROM PCOLLECTION"))
      .saveAsTextFile(args("output"))
   
   ```
   
   
   Running this pipeline results in a KryoException as follows:
   
   ```
   
   com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   Serialization trace:
   fieldIndices
   (org.apache.beam.sdk.schemas.Schema)
   schema (org.apache.beam.sdk.values.RowWithStorage)
   org.apache.beam.sdk.Pipeline$PipelineExecutionException:
   
   com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   
   ```
   
   
   However, inserting a RowCoder matching the SQL output, in this case a single 
count int column:
   
   ```
   
      ...snip...
      .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
      .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
   
     .saveAsTextFile(args("output"))
   
   ```
   
   
   Now the pipeline runs just fine.
   
   Having to manually tell the pipeline how to encode the SQL output seems 
unnecessary, given that we specify the input schema/coder(s) and a query. It 
seems to me that we should be able to infer the output schema from that - but I 
can't see how, other than maybe using Calcite directly?
   
   Before raising a ticket on the Beam Jira, I thought I'd check I wasn't 
missing something obvious!
   
   
   
   Imported from Jira 
[BEAM-5335](https://issues.apache.org/jira/browse/BEAM-5335). Original Jira may 
contain additional context.
   Reported by: kedin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to