Satendra Kumar created SPARK-50203:
--------------------------------------

             Summary: Data ingestion into the Iceberg table (S3 bucket) via a 
Spark batch job is failing due to an Out of Memory.
                 Key: SPARK-50203
                 URL: https://issues.apache.org/jira/browse/SPARK-50203
             Project: Spark
          Issue Type: Question
          Components: Spark Core
    Affects Versions: 3.5.3
         Environment: Running on MacOS + 48GB Memory + 16Cores + M3
            Reporter: Satendra Kumar
             Fix For: 4.0.0


While attempting to ingest data into an Iceberg table on S3 using a Spark batch 
job, the process fails with an OOM error. Initial investigation suggests that 
the use of bucketing as a partitioning strategy may be the cause. When 
bucketing is removed, the code runs successfully.

 

Here is the current Spark code being used:
{quote} 
{code:java}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
import java.sql.Date
import java.time.LocalDate
import scala.util.Random
object IcebergDataGenerator {
  def main(args: Array[String]): Unit = {
    val logger: Logger = LoggerFactory.getLogger(this.getClass)
    val spark =
      SparkSession
        .builder()
        .appName("Iceberg Data Generator")
        .master("local[*]")
        .config("spark.driver.memory", "16g")
        .config("spark.executor.memory", "16g")
        .config("spark.driver.maxResultSize", "2g")
        .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.rest", 
"org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.rest.type", "rest")
        .config("spark.sql.catalog.rest.uri", "http://127.0.0.1:9001/iceberg/";)
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.shuffle.partitions", "1000")
        .config("spark.default.parallelism", "32")
        .getOrCreate()
    import spark.implicits._
    spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;")
    spark.sql(
      """
        |CREATE TABLE IF NOT EXISTS rest.db.customers2 (
        |  customer_id INT,
        |  customer_name STRING,
        |  date DATE,
        |  transaction_details STRING
        |) USING iceberg
        |PARTITIONED BY (bucket(1000, customer_id), days(date))
  """.stripMargin)
    // generate the data 
    def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date, 
String)] = {
      val random = new Random()
      numbers.map { i =>
        val customerId = i
        val customerName = s"Customer_$i"
        val date = Date.valueOf(LocalDate.now().minusDays(random.nextInt(180))) 
// Random date within the last 6 months
        val transactionDetails = s"Transaction details for customer $i"
        (customerId, customerName, date, transactionDetails)
      }
    }
    // Generate  1,00,000 users 
    val customerData = generateCustomerData(1 to 100000)
    // Convert to DataFrame
    val customerDF = customerData.toDF("customer_id", "customer_name", "date", 
"transaction_details")
    // Write the data to an Apache Iceberg table
    logger.info(s"partition count:  ${customerDF.rdd.getNumPartitions}")
    customerDF
      .write
      .format("iceberg")
      .mode("append")
      .save("rest.db.customers2")
    val df = spark.sql("SELECT * FROM rest.db.customers2;")
    logger.info("Count:   " + df.count())
    // Stop Spark session
    spark.stop()
  }
}
 
{code}
 
{quote}
 
 

*Questions:*

 

 1. Can bucketing be effectively used with Iceberg tables in Spark?

 2. What could be causing the OOM issue, and are there potential workarounds?

 

Let me know if you'd like any additional details added!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to