Hi Enrico,
Thanks for helping me to understand the mistakes.
My end goal is to achieve is something like the below:
1. Generate the data
2. Send the data in the batch of 4k records in one batch since the
API can accept the 4k records at once.
3. The record would be as the below:
4.
{
"Token": "", "CustomerName":"", "Object":"", "Data":
[{"A":"1"},{"A":"2"}]
}
5. Token will be generated first then it would be passed to the
'Token' key in the data.
For the above goal, I initially wrote something like the below which
gives a heap error because the data frame is getting created on the
driver side, and the size of the records is a minimum of 1M.
df = modifiedData
df = df.withColumn("uniqueID", lit("1"))
df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
))
tokenUrl = ""
# tokenUrl = ""
policyUrl = ""
tokenBody = {"Username": "", "Password": "",
"CustomerName": ""}
def get_token(url, payload):
try:_2022_06_11
print("Getting Token")
response = requests.request("POST", url, data=payload)
data = response.json()
if data['ErrorDescription'] == 'Success':
token = data['Token']
print(":::Token Generated::::")
else:
print('TokenNotGeneratedFrom: ')
# raise TokenNotGeneratedFrom(500, 'Token not
Generated from ')
return token
except Exception as e:
print('TokenNotGeneratedFrom: ' + str(e))
# raise TokenNotGeneratedFrom(500, str(e))
def call_to_cust_bulk_api(url, payload):
print("Calling Bulk API")
try:
# TODO: write code...
headers = {'content-type': 'application/json'}
print(":::::::jsn load::::")
# print(json.dumps(payload))
# print(payload)
response = requests.post
<http://requests.post>(url, data=json.dumps(payload), headers=headers)
# print(json.dumps(payload))
data = response.json()
return data
except Exception as e:
print('ExceptionInPushingDataTo: ' + str(e))
# raise ExceptionInPushingDataTo(500, str(e))
total_count = df.count()
i = 1
while i < total_count:
rangeNum = i + 3999
print("Range Num:::")
print(rangeNum)
df1 = df.filter((col("row_num") >= i) &
(col("row_num") <= rangeNum))
df1.cache()
maxValue =
df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
print("finalDF count:::", finalDF.count())
token = get_token(tokenUrl, tokenBody)
result =
json.loads((finalDF.toPandas().to_json(orient="records")))
# token = get_token(tokenUrl, tokenBody)
custRequestBody = {
"Token": token,
"CustomerName": "",
"Object": "",
"Data": result
}
# print("::::Customer Request Body::::::")
# print(json.dumps(custRequestBody))
response = call_to_cust_bulk_api(policyUrl,
custRequestBody)
print(response)
finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow()))).withColumn(
"status_for_each_batch",
lit(str(response)))
print("Max Value:::")
print(maxValue)
print("Next I:::")
i = rangeNum + 1
print(i)
This is my very first approach to hitting the APIs with Spark. So,
could you please help me to redesign the approach, or can share some
links or references using which I can go to the depth of this and
rectify myself. How can I scale this?
Any help is much appreciated. Thank you so much again for your time.
TIA,
Sid
On Fri, Jun 10, 2022 at 8:54 PM Enrico Minack <i...@enrico.minack.dev>
wrote:
Hi,
This adds a column with value "1" (string) *in all rows*:
|df = df.withColumn("uniqueID", lit("1")) |
||This counts the rows for all rows that have the same |uniqueID|,
*which are all rows*. The window does not make much sense.
And it orders all rows that have the same |uniqueID |by
|uniqueID|. Does not make much sense either.
|df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) )) |
Then it looks like it takes the first 4000 rows (row_num from 1 to
4000) and tries to send them via HTTP POST. Then it moves the
range by one and sends row 2 to 4001 (mostly overlapped with the
first POST).
It is not clear if the "Data" field is meant to be all rows or
only a single row. Either way, this is not what happens. Please
consider the difference between a Column and a DataFrame in Spark.
This is very different from Pandas.
I think you have to rethink your approach. Using Spark means big
data. This approach is iterative and single-threaded.
Enrico
Am 10.06.22 um 16:01 schrieb Sid:
Hi Enrico,
Thanks for your time. Much appreciated.
I am expecting the payload to be as a JSON string to be a record
like below:
{"A":"some_value","B":"some_value"}
Where A and B are the columns in my dataset.
On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack
<i...@enrico.minack.dev> wrote:
Sid,
just recognized you are using Python API here. Then
||struct(*colsListToBePassed))|| should be correct, given it
takes a list of strings.
Your method |call_to_cust_bulk_api| takes argument |payload|,
which is a ||Column||. This is then used in
|custRequestBody|. That is pretty strange use of a column
expression. What do you expect |print(payload)| to be?
I recommend to split that complex command into multiple
commands to find out what "an error of column not iterable"
refers to.
Enrico
Am 10.06.22 um 13:39 schrieb Enrico Minack:
Hi Sid,
||finalDF =
finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch",
call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed)))) | |
You are calling ||withColumn|| with the result of
||call_to_cust_bulk_api|| as the second argument. That
result looks like it is of type string. But ||withColumn||
expects type ||Column||. You can turn that string into a
||Column|| using ||lit||:
||finalDF =
finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch",
lit(call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed))))) ||
You are saying that gives you an error of column not
iterable. I reckon the ||struct(*colsListToBePassed))|| is
wrong.
Method ||struct|| requires a single string followed by a
list of strings. Given your ||colsListToBePassed|| is a list
of strings, this does not work. Try:
|| struct(||||||colsListToBePassed.head,
||colsListToBePassed.tail|||||: _*|))||
Alternatively, ||struct|| requires a list of ||Column||, so
try this:
|| struct(||||||colsListToBePassed.map(col)|||||||: _*|))||
The API is pretty clear about the types it expects.
If you are still having errors, you better please paste the
code and error.
Enrico
Am 09.06.22 um 21:31 schrieb Sid:
Hi Experts,
I am facing one problem while passing a column to the
method. The problem is described in detail here:
https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
TIA,
Sid