Hello Ayan,

Thank you for the suggestion. But, I would lose correlation of the JSON
file with the other identifier fields. Also, if there are too many files,
will it be an issue? Plus, I may not have the same schema across all the
files.

Hello Enrico,

>how does RDD's mapPartitions make a difference regarding
I guess, in the question above I do have to process row-wise and RDD may be
more efficient?

Thanks,
Muthu

On Tue, 12 Jul 2022 at 14:55, ayan guha <guha.a...@gmail.com> wrote:

> Another option is:
>
> 1. collect the dataframe with file path
> 2. create a list of paths
> 3. create a new dataframe with spark.read.json and pass the list of path
>
> This will save you lots of headache
>
> Ayan
>
>
> On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack <i...@enrico.minack.dev>
> wrote:
>
>> Hi,
>>
>> how does RDD's mapPartitions make a difference regarding 1. and 2.
>> compared to Dataset's mapPartitions / map function?
>>
>> Enrico
>>
>>
>> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
>>
>> Hello Enrico,
>>
>> Thanks for the reply. I found that I would have to use `mapPartitions`
>> API of RDD to perform this safely as I have to
>> 1. Read each file from GCS using HDFS FileSystem API.
>> 2. Parse each JSON record in a safe manner.
>>
>> For (1) to work, I do have to broadcast HadoopConfiguration from
>> sparkContext. I did try to use GCS Java API to read content, but ran into
>> many JAR conflicts as the HDFS wrapper and the JAR library uses different
>> dependencies.
>> Hope this findings helps others as well.
>>
>> Thanks,
>> Muthu
>>
>>
>> On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev>
>> wrote:
>>
>>> All you need to do is implement a method readJson that reads a single
>>> file given its path. Than, you map the values of column file_path to
>>> the respective JSON content as a string. This can be done via an UDF or
>>> simply Dataset.map:
>>>
>>> case class RowWithJsonUri(entity_id: String, file_path: String,
>>> other_useful_id: String)
>>> case class RowWithJsonContent(entity_id: String, json_content: String,
>>> other_useful_id: String)
>>>
>>> val ds = Seq(
>>>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
>>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
>>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>>>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
>>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
>>> "id-2-01g4he5cbh52che104rwy603sr"),
>>>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
>>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
>>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>>>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
>>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
>>> "id-2-01g4he5cbx1kwhgvdme1s560dw")
>>> ).toDS()
>>>
>>> ds.show(false)
>>>
>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>> |entity_id
>>> |file_path
>>> |other_useful_id                |
>>>
>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>
>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>
>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>
>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>
>>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>
>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>
>>>
>>> def readJson(uri: String): String = { s"content of $uri" }
>>>
>>> ds.map { row => RowWithJsonContent(row.entity_id,
>>> readJson(row.file_path), row.other_useful_id) }.show(false)
>>>
>>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>>> |entity_id
>>> |json_content
>>> |other_useful_id                |
>>>
>>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
>>> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
>>> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
>>> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
>>> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>
>>> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>>>
>>> Cheers,
>>> Enrico
>>>
>>>
>>>
>>>
>>> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>>>
>>> Hello there,
>>>
>>> I have a dataframe with the following...
>>>
>>>
>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>> |entity_id                    |file_path
>>>                          |other_useful_id                |
>>>
>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>
>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>
>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>
>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>
>>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>
>>> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>>>
>>> I would like to read each row from `file_path` and write the result to
>>> another dataframe containing `entity_id`, `other_useful_id`,
>>> `json_content`, `file_path`.
>>> Assume that I already have the required HDFS url libraries in my
>>> classpath.
>>>
>>> Please advice,
>>> Muthu
>>>
>>>
>>>
>>>
>>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to