Satendra Kumar created SPARK-50203:
--------------------------------------
Summary: Data ingestion into the Iceberg table (S3 bucket) via a
Spark batch job is failing due to an Out of Memory.
Key: SPARK-50203
URL: https://issues.apache.org/jira/browse/SPARK-50203
Project: Spark
Issue Type: Question
Components: Spark Core
Affects Versions: 3.5.3
Environment: Running on MacOS + 48GB Memory + 16Cores + M3
Reporter: Satendra Kumar
Fix For: 4.0.0
While attempting to ingest data into an Iceberg table on S3 using a Spark batch
job, the process fails with an OOM error. Initial investigation suggests that
the use of bucketing as a partitioning strategy may be the cause. When
bucketing is removed, the code runs successfully.
Here is the current Spark code being used:
{quote}
{code:java}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
import java.sql.Date
import java.time.LocalDate
import scala.util.Random
object IcebergDataGenerator {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
val spark =
SparkSession
.builder()
.appName("Iceberg Data Generator")
.master("local[*]")
.config("spark.driver.memory", "16g")
.config("spark.executor.memory", "16g")
.config("spark.driver.maxResultSize", "2g")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.rest",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.rest.type", "rest")
.config("spark.sql.catalog.rest.uri", "http://127.0.0.1:9001/iceberg/")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.shuffle.partitions", "1000")
.config("spark.default.parallelism", "32")
.getOrCreate()
import spark.implicits._
spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;")
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS rest.db.customers2 (
| customer_id INT,
| customer_name STRING,
| date DATE,
| transaction_details STRING
|) USING iceberg
|PARTITIONED BY (bucket(1000, customer_id), days(date))
""".stripMargin)
// generate the data
def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date,
String)] = {
val random = new Random()
numbers.map { i =>
val customerId = i
val customerName = s"Customer_$i"
val date = Date.valueOf(LocalDate.now().minusDays(random.nextInt(180)))
// Random date within the last 6 months
val transactionDetails = s"Transaction details for customer $i"
(customerId, customerName, date, transactionDetails)
}
}
// Generate 1,00,000 users
val customerData = generateCustomerData(1 to 100000)
// Convert to DataFrame
val customerDF = customerData.toDF("customer_id", "customer_name", "date",
"transaction_details")
// Write the data to an Apache Iceberg table
logger.info(s"partition count: ${customerDF.rdd.getNumPartitions}")
customerDF
.write
.format("iceberg")
.mode("append")
.save("rest.db.customers2")
val df = spark.sql("SELECT * FROM rest.db.customers2;")
logger.info("Count: " + df.count())
// Stop Spark session
spark.stop()
}
}
{code}
{quote}
*Questions:*
1. Can bucketing be effectively used with Iceberg tables in Spark?
2. What could be causing the OOM issue, and are there potential workarounds?
Let me know if you'd like any additional details added!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]