[
https://issues.apache.org/jira/browse/BEAM-5335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17547216#comment-17547216
]
Kenneth Knowles commented on BEAM-5335:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/18984
> [SQL] Output schema is not set incorrectly
> ------------------------------------------
>
> Key: BEAM-5335
> URL: https://issues.apache.org/jira/browse/BEAM-5335
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Anton Kedin
> Priority: P3
>
> *From:
> https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query
> :*
> I've been playing with the Beam SQL DSL and I'm unable to use the output from
> a query without providing a code that's aware of the output schema manually.
> Can I infer the output schema rather than hardcoding it?
> Neither the walkthrough or the examples actually use the output from a query.
> I'm using Scio rather than the plain Java API to keep the code relatively
> readable and concise, I don't think that makes a difference for this question.
> Here's an example of what I mean.
> Given an input schema inSchema and some data source that is mapped onto a Row
> as follows: (in this example, Avro-based, but again, I don't think that
> matters):
> {code}
> sc.avroFile[Foo](args("input"))
> .map(fooToRow)
> .setCoder(inSchema.getRowCoder)
> .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
> .saveAsTextFile(args("output"))
> {code}
> Running this pipeline results in a KryoException as follows:
> {code}
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> fieldIndices (org.apache.beam.sdk.schemas.Schema)
> schema (org.apache.beam.sdk.values.RowWithStorage)
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> {code}
> However, inserting a RowCoder matching the SQL output, in this case a single
> count int column:
> {code}
> ...snip...
> .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
> .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
> .saveAsTextFile(args("output"))
> {code}
> Now the pipeline runs just fine.
> Having to manually tell the pipeline how to encode the SQL output seems
> unnecessary, given that we specify the input schema/coder(s) and a query. It
> seems to me that we should be able to infer the output schema from that - but
> I can't see how, other than maybe using Calcite directly?
> Before raising a ticket on the Beam Jira, I thought I'd check I wasn't
> missing something obvious!
--
This message was sent by Atlassian Jira
(v8.20.7#820007)