There isn't a query per se.im writing the entire dataframe from the output
of the read stream. Once I got that working I was planning to test the
query aspect


I'll do a bit more digging. Thank you very much for your help. Structued
streaming is very exciting and I really am enjoying writing a connector for
it!

Regards
Sam
On Thu, 2 Feb 2017 at 00:02, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> What is the query you are apply writeStream on? Essentially can you print
> the whole query.
>
> Also, you can do StreamingQuery.explain() to see in full details how the
> logical plan changes to physical plan, for a batch of data. that might
> help. try doing that with some other sink to make sure the source works
> correctly, and then try using your sink.
>
> If you want further debugging, then you will have to dig into the
> StreamingExecution class in Spark, and debug stuff there.
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523
>
> On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Yeah sorry Im still working on it, its on a branch you can find here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
> ignore the logging messages I was trying to workout how the APIs work and
> unfortunately because I have to shade the dependency I cant debug it in an
> IDE (that I know of! )
>
> So I can see the correct schema here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
>  and
> also when the df is returned(After .load() )
>
> But when that same df has writeStream applied to it, the addBatch
> dataframe has a new schema. Its similar to the old schema but some ints
> have been turned to strings.
>
>
>
> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Thanks for the quick response TD!
>
> Ive been trying to identify where exactly this transformation happens
>
> The readStream returns a dataframe with the correct schema
>
> The minute I call writeStream, by the time I get to the addBatch method,
> the dataframe there has an incorrect Schema
>
> So Im skeptical about the issue being prior to the readStream since the
> output dataframe has the correct Schema
>
>
> Am I missing something completely obvious?
>
> Regards
> Sam
>
> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> You should make sure that schema of the streaming Dataset returned by
> `readStream`, and the schema of the DataFrame returned by the sources
> getBatch.
>
> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hi All
>
> I am writing a bigquery connector here
> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
> error with schemas being overwritten when a dataframe is passed over to the
> Sink
>
>
> for example the source returns this StructType
> WARN streaming.BigQuerySource:
> StructType(StructField(customerid,LongType,true),
>
> and the sink is recieving this StructType
> WARN streaming.BigQuerySink:
> StructType(StructField(customerid,StringType,true)
>
>
> Any idea why this might be happening?
> I dont have infering schema on
>
> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>
> I know its off by default but I set it just to be sure
>
> So completely lost to what could be causing this
>
> Regards
> Sam
>
>
>
>
>
>
>

Reply via email to