If you are dealing with a bunch of different schemas in 1 field, figuring
out a strategy to deal with that will depend on your data and does not
really have anything to do with spark since mapping your JSON payloads to
tractable data structures will depend on business logic.

The strategy of pulling out a blob into its on rdd and feeding it into the
JSON loader should work for any data source once you have your data
strategy figured out.

On Wed, Nov 16, 2016 at 4:39 PM, kant kodali <kanth...@gmail.com> wrote:

> 1. I have a Cassandra Table where one of the columns is blob. And this
> blob contains a JSON encoded String however not all the blob's across the
> Cassandra table for that column are same (some blobs have difference json's
> than others) so In that case what is the best way to approach it? Do we
> need to put /group all the JSON Blobs that have same structure (same keys)
> into each individual data frame? For example, say if I have 5 json blobs
> that have same structure and another 3 JSON blobs that belongs to some
> other structure In this case do I need to create two data frames? (Attached
> is a screen shot of 2 rows of how my json looks like)
> 2. In my case, toJSON on RDD doesn't seem to help a lot. Attached a screen
> shot. Looks like I got the same data frame as my original one.
>
> Thanks much for these examples.
>
>
>
> On Wed, Nov 16, 2016 at 2:54 PM, Nathan Lande <nathanla...@gmail.com>
> wrote:
>
>> I'm looking forward to 2.1 but, in the meantime, you can pull out the
>> specific column into an RDD of JSON objects, pass this RDD into the
>> read.json() and then join the results back onto your initial DF.
>>
>> Here is an example of what we do to unpack headers from Avro log data:
>>
>> def jsonLoad(path):
>>     #
>>     #load in the df
>>     raw = (sqlContext.read
>>             .format('com.databricks.spark.avro')
>>             .load(path)
>>         )
>>     #
>>     #define json blob, add primary key elements (hi and lo)
>>     #
>>     JSONBlob = concat(
>>         lit('{'),
>>         concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
>> lit(',')),
>>         concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
>> lit(',')),
>>         concat(lit('"response":'), decode('requestResponse.response',
>> 'UTF-8')),
>>         lit('}')
>>     )
>>     #
>>     #extract the JSON blob as a string
>>     rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
>>     #
>>     #transform the JSON string into a DF struct object
>>     structuredJSON = sqlContext.read.json(rawJSONString)
>>     #
>>     #join the structured JSON back onto the initial DF using the hi and
>> lo join keys
>>     final = (raw.join(structuredJSON,
>>                 ((raw['header.eventId.lo'] == structuredJSON['lo']) &
>> (raw['header.eventId.hi'] == structuredJSON['hi'])),
>>                 'left_outer')
>>             .drop('hi')
>>             .drop('lo')
>>         )
>>     #
>>     #win
>>     return final
>>
>> On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon <gurwls...@gmail.com>
>>> wrote:
>>>
>>>> Maybe it sounds like you are looking for from_json/to_json functions
>>>> after en/decoding properly.
>>>>
>>>
>>> Which are new built-in functions that will be released with Spark 2.1.
>>>
>>
>>
>

Reply via email to