Yeah sorry Im still working on it, its on a branch you can find here
<https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
ignore the logging messages I was trying to workout how the APIs work and
unfortunately because I have to shade the dependency I cant debug it in an
IDE (that I know of! )

So I can see the correct schema here
<https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
and
also when the df is returned(After .load() )

But when that same df has writeStream applied to it, the addBatch dataframe
has a new schema. Its similar to the old schema but some ints have been
turned to strings.



On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Thanks for the quick response TD!
>>
>> Ive been trying to identify where exactly this transformation happens
>>
>> The readStream returns a dataframe with the correct schema
>>
>> The minute I call writeStream, by the time I get to the addBatch method,
>> the dataframe there has an incorrect Schema
>>
>> So Im skeptical about the issue being prior to the readStream since the
>> output dataframe has the correct Schema
>>
>>
>> Am I missing something completely obvious?
>>
>> Regards
>> Sam
>>
>> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You should make sure that schema of the streaming Dataset returned by
>>> `readStream`, and the schema of the DataFrame returned by the sources
>>> getBatch.
>>>
>>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>>> Hi All
>>>>
>>>> I am writing a bigquery connector here
>>>> <http://github.com/samelamin/spark-bigquery> and I am getting a
>>>> strange error with schemas being overwritten when a dataframe is passed
>>>> over to the Sink
>>>>
>>>>
>>>> for example the source returns this StructType
>>>> WARN streaming.BigQuerySource: StructType(StructField(custome
>>>> rid,LongType,true),
>>>>
>>>> and the sink is recieving this StructType
>>>> WARN streaming.BigQuerySink: StructType(StructField(custome
>>>> rid,StringType,true)
>>>>
>>>>
>>>> Any idea why this might be happening?
>>>> I dont have infering schema on
>>>>
>>>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>>>
>>>> I know its off by default but I set it just to be sure
>>>>
>>>> So completely lost to what could be causing this
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>
>>>
>>
>

Reply via email to