I'm looking forward to 2.1 but, in the meantime, you can pull out the
specific column into an RDD of JSON objects, pass this RDD into the
read.json() and then join the results back onto your initial DF.
Here is an example of what we do to unpack headers from Avro log data:
def jsonLoad(path):
#
#load in the df
raw = (sqlContext.read
.format('com.databricks.spark.avro')
.load(path)
)
#
#define json blob, add primary key elements (hi and lo)
#
JSONBlob = concat(
lit('{'),
concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
lit(',')),
concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
lit(',')),
concat(lit('"response":'), decode('requestResponse.response',
'UTF-8')),
lit('}')
)
#
#extract the JSON blob as a string
rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
#
#transform the JSON string into a DF struct object
structuredJSON = sqlContext.read.json(rawJSONString)
#
#join the structured JSON back onto the initial DF using the hi and lo
join keys
final = (raw.join(structuredJSON,
((raw['header.eventId.lo'] == structuredJSON['lo']) &
(raw['header.eventId.hi'] == structuredJSON['hi'])),
'left_outer')
.drop('hi')
.drop('lo')
)
#
#win
return final
On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <[email protected]>
wrote:
> On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon <[email protected]> wrote:
>
>> Maybe it sounds like you are looking for from_json/to_json functions
>> after en/decoding properly.
>>
>
> Which are new built-in functions that will be released with Spark 2.1.
>