soumilshah1995 opened a new issue, #11391:
URL: https://github.com/apache/hudi/issues/11391
Description:
I'm encountering an issue while attempting to use DynamoDB Based Lock with
an Apache Hudi PySpark job running locally. The goal is to have the job access
DynamoDB in a specified region for locking purposes.
Configuration Used:
```
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes': 'LAZY',
'hoodie.write.lock.provider':
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': 'hudi-lock-table', # DynamoDB table
name for locking
'hoodie.write.lock.dynamodb.region': curr_region, # DynamoDB region
'hoodie.write.lock.dynamodb.endpoint_url':
f'dynamodb.{curr_region}.amazonaws.com',
'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',
```
Error Logs
```
org.apache.hudi.exception.HoodieException: Unable to load class
at
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:58)
...
Caused by: java.lang.ClassNotFoundException:
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
...
```
# Code
```
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType,
DateType, FloatType
from pyspark.sql.functions import col
from datetime import datetim
print("Imports loaded ")
except Exception as e:
print("error", e)
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
schema = StructType([
StructField("orderID", StringType(), True),
StructField("productSKU", StringType(), True),
StructField("customerID", StringType(), True),
StructField("orderDate", StringType(), True),
StructField("orderAmount", FloatType(), True),
StructField("state", StringType(), True)
])
# Create the data array with the additional state value
data = [
("order1", "prod001", "cust001", "2024-01-15", 150.00, "CA"),
("order002", "prod002", "cust002", "2024-01-16", 200.00, "NY"),
("order003", "prod003", "cust003", "2024-01-17", 300.00, "TX"),
("order004", "prod004", "cust004", "2024-01-18", 250.00, "FL"),
("order005", "prod005", "cust005", "2024-01-19", 100.00, "WA"),
("order006", "prod006", "cust006", "2024-01-20", 350.00, "CA"),
("order007", "prod007", "cust007", "2024-01-21", 400.00, "NY")
]
# Create the DataFrame
df = spark.createDataFrame(data, schema)
# Show the DataFrame with the new "state" column
df.show()
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='BLOOM',
curr_region='us-east-1'
):
path =
f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes': 'LAZY',
'hoodie.write.lock.provider':
'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': 'hudi-lock-table', # DynamoDB
table name for locking
'hoodie.write.lock.dynamodb.region': curr_region, # DynamoDB region
'hoodie.write.lock.dynamodb.endpoint_url':
f'dynamodb.{curr_region}.amazonaws.com',
'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',
}
print(hudi_options)
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
write_to_hudi(
spark_df=df,
db_name="default",
table_name="orders",
recordkey="orderID",
precombine="orderDate",
partition_fields="state",
index_type="RECORD_INDEX"
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]