What is the query you are apply writeStream on? Essentially can you print
the whole query.

Also, you can do StreamingQuery.explain() to see in full details how the
logical plan changes to physical plan, for a batch of data. that might
help. try doing that with some other sink to make sure the source works
correctly, and then try using your sink.

If you want further debugging, then you will have to dig into the
StreamingExecution class in Spark, and debug stuff there.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523

On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Yeah sorry Im still working on it, its on a branch you can find here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
> ignore the logging messages I was trying to workout how the APIs work and
> unfortunately because I have to shade the dependency I cant debug it in an
> IDE (that I know of! )
>
> So I can see the correct schema here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
>  and
> also when the df is returned(After .load() )
>
> But when that same df has writeStream applied to it, the addBatch
> dataframe has a new schema. Its similar to the old schema but some ints
> have been turned to strings.
>
>
>
> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I am assuming that you have written your own BigQuerySource (i dont see
>> that code in the link you posted). In that source, you must have
>> implemented getBatch which uses offsets to return the Dataframe having the
>> data of a batch. Can you double check when this DataFrame returned by
>> getBatch, has the expected schema?
>>
>> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>>> Thanks for the quick response TD!
>>>
>>> Ive been trying to identify where exactly this transformation happens
>>>
>>> The readStream returns a dataframe with the correct schema
>>>
>>> The minute I call writeStream, by the time I get to the addBatch method,
>>> the dataframe there has an incorrect Schema
>>>
>>> So Im skeptical about the issue being prior to the readStream since the
>>> output dataframe has the correct Schema
>>>
>>>
>>> Am I missing something completely obvious?
>>>
>>> Regards
>>> Sam
>>>
>>> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> You should make sure that schema of the streaming Dataset returned by
>>>> `readStream`, and the schema of the DataFrame returned by the sources
>>>> getBatch.
>>>>
>>>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> I am writing a bigquery connector here
>>>>> <http://github.com/samelamin/spark-bigquery> and I am getting a
>>>>> strange error with schemas being overwritten when a dataframe is passed
>>>>> over to the Sink
>>>>>
>>>>>
>>>>> for example the source returns this StructType
>>>>> WARN streaming.BigQuerySource: StructType(StructField(custome
>>>>> rid,LongType,true),
>>>>>
>>>>> and the sink is recieving this StructType
>>>>> WARN streaming.BigQuerySink: StructType(StructField(custome
>>>>> rid,StringType,true)
>>>>>
>>>>>
>>>>> Any idea why this might be happening?
>>>>> I dont have infering schema on
>>>>>
>>>>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>>>>
>>>>> I know its off by default but I set it just to be sure
>>>>>
>>>>> So completely lost to what could be causing this
>>>>>
>>>>> Regards
>>>>> Sam
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to