What do you mean by overkill here?

I tried the below way to iterate over 4k records under a while loop.
However, it runs for the only first record. What could be wrong here? I am
going through few SO blogs where user found the below approach faster than
withColumn approach :

finalDF = finalDF.select("meta").rdd.map(
    lambda x: call_to_cust_bulk_api(policyUrl, x[0])).toDF()


On Mon, Jun 13, 2022 at 4:13 PM Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> >> spark.range(10000).createOrReplaceTempView("test")
> >> maximum_records_per_api_call = 40
> >> batch_count = spark.sql("SELECT * FROM test").count() /
> maximum_records_per_api_call
> >> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count)
> batch_id FROM
> test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch")
>
>
> the above code should be able to then be run with a udf as long as we are
> able to control the parallelism with the help of executor count and task
> cpi configuration.
>
> But once again, this is just an unnecessary overkill.
>
>
> Regards,
> Gourav Sengupta
>
> On Mon, Jun 13, 2022 at 10:41 AM Sid <flinkbyhe...@gmail.com> wrote:
>
>> Hi Gourav,
>>
>> Could you please provide me with some examples?
>>
>> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> try to use mod of a monotonically increasing field and then use
>>> repartitionbyrange function, and see whether SPARK automatically serialises
>>> it based on the number of executors that you put in the job.
>>>
>>> But once again, this is kind of an overkill, for fetching data from a
>>> API, creating a simple python program works quite well.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Mon, Jun 13, 2022 at 9:28 AM Sid <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Hi Gourav,
>>>>
>>>> Do you have any examples or links, please? That would help me to
>>>> understand.
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I think that serialising data using spark is an overkill, why not use
>>>>> normal python.
>>>>>
>>>>> Also have you tried repartition by range, that way you can use modulus
>>>>> operator to batch things up?
>>>>>
>>>>> Regards,
>>>>> Gourav
>>>>>
>>>>>
>>>>> On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I am trying to hit the POST APIs for the very first time using
>>>>>> Pyspark.
>>>>>>
>>>>>> My end goal is to achieve is something like the below:
>>>>>>
>>>>>>
>>>>>>    1.  Generate the data
>>>>>>    2. Send the data in the batch of 4k records in one batch since
>>>>>>    the API can accept the 4k records at once.
>>>>>>    3. The record would be as the below:
>>>>>>    4.
>>>>>>
>>>>>>    {
>>>>>>        "Token": "",
>>>>>>        "CustomerName": "",
>>>>>>        "Object": "",
>>>>>>        "Data": [{"A":"1"},{"A":"2"}]
>>>>>>    }
>>>>>>
>>>>>>    5. Token will be generated first then it would be passed to the
>>>>>>    'Token' key in the data.
>>>>>>
>>>>>> For the above goal, I initially wrote something like the below which
>>>>>> gives a heap error because the data frame is getting created on the 
>>>>>> driver
>>>>>> side, and the size of the records is a minimum of 1M.
>>>>>>            df = modifiedData # Assume it to be query results stored
>>>>>> as a DF
>>>>>>
>>>>>>             df = df.withColumn("uniqueID", lit("1"))
>>>>>>
>>>>>>             df = df.withColumn("row_num", row_number().over(
>>>>>>
>>>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>>>>>             ))
>>>>>>             tokenUrl = ""
>>>>>>             # tokenUrl = ""
>>>>>>             policyUrl = ""
>>>>>>             tokenBody = {"Username": "", "Password": "",
>>>>>> "CustomerName": ""}
>>>>>>
>>>>>>             def get_token(url, payload):
>>>>>>                 try:
>>>>>>                     print("Getting Token")
>>>>>>                     response = requests.request("POST", url,
>>>>>> data=payload)
>>>>>>                     data = response.json()
>>>>>>                     if data['ErrorDescription'] == 'Success':
>>>>>>                         token = data['Token']
>>>>>>                         print(":::Token Generated::::")
>>>>>>                     else:
>>>>>>                         print('TokenNotGeneratedFrom: ')
>>>>>>                         # raise TokenNotGeneratedFrom(500, 'Token not
>>>>>> Generated from ')
>>>>>>                     return token
>>>>>>                 except Exception as e:
>>>>>>                     print('TokenNotGeneratedFrom: ' + str(e))
>>>>>>                     # raise TokenNotGeneratedFrom(500, str(e))
>>>>>>
>>>>>>             def call_to_cust_bulk_api(url, payload):
>>>>>>                 print("Calling Bulk API")
>>>>>>                 try:
>>>>>>                     # TODO: write code...
>>>>>>                     headers = {'content-type': 'application/json'}
>>>>>>                     print(":::::::jsn load::::")
>>>>>>                     # print(json.dumps(payload))
>>>>>>                     # print(payload)
>>>>>>                     response = requests.post(url,
>>>>>> data=json.dumps(payload), headers=headers)
>>>>>>                     # print(json.dumps(payload))
>>>>>>                     data = response.json()
>>>>>>                     return data
>>>>>>                 except Exception as e:
>>>>>>                     print('ExceptionInPushingDataTo: ' + str(e))
>>>>>>                     # raise ExceptionInPushingDataTo(500, str(e))
>>>>>>
>>>>>>             total_count = df.count()
>>>>>>             i = 1
>>>>>>             while i < total_count:
>>>>>>                 rangeNum = i + 3999
>>>>>>                 print("Range Num:::")
>>>>>>                 print(rangeNum)
>>>>>>                 df1 = df.filter((col("row_num") >= i) &
>>>>>> (col("row_num") <= rangeNum))
>>>>>>                 df1.cache()
>>>>>>                 maxValue =
>>>>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>>>>>>                 finalDF = df1.drop("row_num", "edl_timestamp",
>>>>>> "uniqueID")
>>>>>>                 print("finalDF count:::", finalDF.count())
>>>>>>                 token = get_token(tokenUrl, tokenBody)
>>>>>>
>>>>>>                 result =
>>>>>> json.loads((finalDF.toPandas().to_json(orient="records")))
>>>>>>                 # token = get_token(tokenUrl, tokenBody)
>>>>>>                 custRequestBody = {
>>>>>>                     "Token": token,
>>>>>>                     "CustomerName": "",
>>>>>>                     "Object": "",
>>>>>>                     "Data": result
>>>>>>                 }
>>>>>>
>>>>>>                 # print("::::Customer Request Body::::::")
>>>>>>                 # print(json.dumps(custRequestBody))
>>>>>>                 response = call_to_cust_bulk_api(policyUrl,
>>>>>> custRequestBody)
>>>>>>                 print(response)
>>>>>>                 finalDFStatus = finalDF.withColumn("edl_timestamp",
>>>>>> to_timestamp(lit(F.TimeNow()))).withColumn(
>>>>>>                     "status_for_each_batch",
>>>>>>                     lit(str(response)))
>>>>>>
>>>>>>
>>>>>>                 print("Max Value:::")
>>>>>>                 print(maxValue)
>>>>>>                 print("Next I:::")
>>>>>>                 i = rangeNum + 1
>>>>>>                 print(i)
>>>>>>
>>>>>> This is my very first approach to hitting the APIs with Spark. So,
>>>>>> could anyone please help me to redesign the approach, or can share some
>>>>>> links or references using which I can go to the depth of this and rectify
>>>>>> myself. How can I scale this?
>>>>>>
>>>>>>
>>>>>> Any help is much appreciated.
>>>>>>
>>>>>> TIA,
>>>>>> Sid
>>>>>>
>>>>>

Reply via email to