Hi,

you can parallelize sending the requests as follows (just sketching code):

# gets an iterable of Pandas DataFrames
def send(pdfs: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    responses = []
    # for each Pandas DataFrame (could be smaller than 4000 rows, reduce parallelism in that case)
    for pdf in pdfs:
        # split the DataFrame into chunks of 4000 rows
        for chunk in chunks:
            # send a chunk
            # memorize response
            responses.append(response_status)
    yield pd.DataFrame(responses)

# number of parallel requests:
parallelism = 10

# define schema of result DataFrame
response_schem = StructType(...)

# process data in parallel:
df.repartition(parallelism).mapInPandas(send, response_schema).collect()

Enrico


Am 11.06.22 um 18:31 schrieb Sid:
Hi Enrico,

Thanks for helping me to understand the mistakes.

My end goal is to achieve is something like the below:

 1.  Generate the data
 2. Send the data in the batch of 4k records in one batch since the
    API can accept the 4k records at once.
 3. The record would be as the below:
4.
    {
         "Token": "", "CustomerName":"", "Object":"", "Data": 
[{"A":"1"},{"A":"2"}]
    }
 5. Token will be generated first then it would be passed to the
    'Token' key in the data.

For the above goal, I initially wrote something like the below which gives a heap error because the data frame is getting created on the driver side, and the size of the records is a minimum of 1M.
           df = modifiedData

            df = df.withColumn("uniqueID", lit("1"))

            df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
            ))
            tokenUrl = ""
            # tokenUrl = ""
            policyUrl = ""
            tokenBody = {"Username": "", "Password": "", "CustomerName": ""}

            def get_token(url, payload):
                try:_2022_06_11
                    print("Getting Token")
                    response = requests.request("POST", url, data=payload)
                    data = response.json()
                    if data['ErrorDescription'] == 'Success':
                        token = data['Token']
                        print(":::Token Generated::::")
                    else:
                        print('TokenNotGeneratedFrom: ')
                        # raise TokenNotGeneratedFrom(500, 'Token not Generated from ')
                    return token
                except Exception as e:
                    print('TokenNotGeneratedFrom: ' + str(e))
                    # raise TokenNotGeneratedFrom(500, str(e))

            def call_to_cust_bulk_api(url, payload):
                print("Calling Bulk API")
                try:
                    # TODO: write code...
                    headers = {'content-type': 'application/json'}
                    print(":::::::jsn load::::")
                    # print(json.dumps(payload))
                    # print(payload)
                    response = requests.post <http://requests.post>(url, data=json.dumps(payload), headers=headers)
                    # print(json.dumps(payload))
                    data = response.json()
                    return data
                except Exception as e:
                    print('ExceptionInPushingDataTo: ' + str(e))
                    # raise ExceptionInPushingDataTo(500, str(e))

            total_count = df.count()
            i = 1
            while i < total_count:
                rangeNum = i + 3999
                print("Range Num:::")
                print(rangeNum)
                df1 = df.filter((col("row_num") >= i) & (col("row_num") <= rangeNum))
                df1.cache()
                maxValue = df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
                finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
                print("finalDF count:::", finalDF.count())
                token = get_token(tokenUrl, tokenBody)

                result = json.loads((finalDF.toPandas().to_json(orient="records")))
                # token = get_token(tokenUrl, tokenBody)
                custRequestBody = {
                    "Token": token,
                    "CustomerName": "",
                    "Object": "",
                    "Data": result
                }

                # print("::::Customer Request Body::::::")
                # print(json.dumps(custRequestBody))
                response = call_to_cust_bulk_api(policyUrl, custRequestBody)
                print(response)
                finalDFStatus = finalDF.withColumn("edl_timestamp", to_timestamp(lit(F.TimeNow()))).withColumn(
                    "status_for_each_batch",
                    lit(str(response)))


                print("Max Value:::")
                print(maxValue)
                print("Next I:::")
                i = rangeNum + 1
                print(i)

This is my very first approach to hitting the APIs with Spark. So, could you please help me to redesign the approach, or can share some links or references using which I can go to the depth of this and rectify myself. How can I scale this?


Any help is much appreciated. Thank you so much again for your time.

TIA,
Sid




On Fri, Jun 10, 2022 at 8:54 PM Enrico Minack <i...@enrico.minack.dev> wrote:

    Hi,

    This adds a column with value "1" (string) *in all rows*:

    |df = df.withColumn("uniqueID", lit("1")) |

    ||This counts the rows for all rows that have the same |uniqueID|,
    *which are all rows*. The window does not make much sense.
    And it orders all rows that have the same |uniqueID |by
    |uniqueID|. Does not make much sense either.
    |df = df.withColumn("row_num", row_number().over(
    Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) )) |

    Then it looks like it takes the first 4000 rows (row_num from 1 to
    4000) and tries to send them via HTTP POST. Then it moves the
    range by one and sends row 2 to 4001 (mostly overlapped with the
    first POST).

    It is not clear if the "Data" field is meant to be all rows or
    only a single row. Either way, this is not what happens. Please
    consider the difference between a Column and a DataFrame in Spark.
    This is very different from Pandas.

    I think you have to rethink your approach. Using Spark means big
    data. This approach is iterative and single-threaded.

    Enrico


    Am 10.06.22 um 16:01 schrieb Sid:
    Hi Enrico,

    Thanks for your time. Much appreciated.

    I am expecting the payload to be as a JSON string to be a record
    like below:

    {"A":"some_value","B":"some_value"}

    Where A and B are the columns in my dataset.


    On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack
    <i...@enrico.minack.dev> wrote:

        Sid,

        just recognized you are using Python API here. Then
        ||struct(*colsListToBePassed))|| should be correct, given it
        takes a list of strings.

        Your method |call_to_cust_bulk_api| takes argument |payload|,
        which is a ||Column||. This is then used in
        |custRequestBody|. That is pretty strange use of a column
        expression. What do you expect |print(payload)| to be?

        I recommend to split that complex command into multiple
        commands to find out what "an error of column not iterable"
        refers to.

        Enrico


        Am 10.06.22 um 13:39 schrieb Enrico Minack:
        Hi Sid,

        ||finalDF =
        finalDF.repartition(finalDF.rdd.getNumPartitions())
        .withColumn("status_for_batch",
        call_to_cust_bulk_api(policyUrl,
        to_json(struct(*colsListToBePassed)))) | |
        You are calling ||withColumn|| with the result of
        ||call_to_cust_bulk_api|| as the second argument. That
        result looks like it is of type string. But ||withColumn||
        expects type ||Column||. You can turn that string into a
        ||Column|| using ||lit||:

        ||finalDF =
        finalDF.repartition(finalDF.rdd.getNumPartitions())
        .withColumn("status_for_batch",
        lit(call_to_cust_bulk_api(policyUrl,
        to_json(struct(*colsListToBePassed))))) ||

        You are saying that gives you an error of column not
        iterable. I reckon the ||struct(*colsListToBePassed))|| is
        wrong.

        Method ||struct|| requires a single string followed by a
        list of strings. Given your ||colsListToBePassed|| is a list
        of strings, this does not work. Try:

        ||  struct(||||||colsListToBePassed.head,
        ||colsListToBePassed.tail|||||: _*|))||

        Alternatively, ||struct|| requires a list of ||Column||, so
        try this:

        ||  struct(||||||colsListToBePassed.map(col)|||||||: _*|))||

        The API is pretty clear about the types it expects.


        If you are still having errors, you better please paste the
        code and error.

        Enrico



        Am 09.06.22 um 21:31 schrieb Sid:
        Hi Experts,

        I am facing one problem while passing a column to the
        method.  The problem is described in detail here:

        
https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

        TIA,
        Sid




Reply via email to