[ 
https://issues.apache.org/jira/browse/SPARK-50203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satendra Kumar updated SPARK-50203:
-----------------------------------
    Description: 
While attempting to ingest data into an Iceberg table on S3 using a Spark batch 
job, the process fails with an OOM error. Initial investigation suggests that 
the use of bucketing as a partitioning strategy may be the cause. When 
bucketing is removed, the code runs successfully.

 

Here is the current Spark code being used:
{quote} 
{code:java}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
import java.sql.Date
import java.time.LocalDate
import scala.util.Random
object IcebergDataGenerator {
  def main(args: Array[String]): Unit = {
    val logger: Logger = LoggerFactory.getLogger(this.getClass)
    val spark =
      SparkSession
        .builder()
        .appName("Iceberg Data Generator")
        .master("local[*]")
        .config("spark.driver.memory", "16g")
        .config("spark.executor.memory", "16g")
        .config("spark.driver.maxResultSize", "2g")
        .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.rest", 
"org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.rest.type", "rest")
        .config("spark.sql.catalog.rest.uri", "http://127.0.0.1:9001/iceberg/";)
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.shuffle.partitions", "1000")
        .config("spark.default.parallelism", "32")
        .getOrCreate()
    import spark.implicits._
    spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;")
    spark.sql(
      """
        |CREATE TABLE IF NOT EXISTS rest.db.customers2 (
        |  customer_id INT,
        |  customer_name STRING,
        |  date DATE,
        |  transaction_details STRING
        |) USING iceberg
        |PARTITIONED BY (bucket(1000, customer_id), days(date))
  """.stripMargin)
    // generate the data 
    def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date, 
String)] = {
      val random = new Random()
      numbers.map { i =>
        val customerId = i
        val customerName = s"Customer_$i"
        val date = Date.valueOf(LocalDate.now().minusDays(random.nextInt(180))) 
// Random date within the last 6 months
        val transactionDetails = s"Transaction details for customer $i"
        (customerId, customerName, date, transactionDetails)
      }
    }
    // Generate  1,00,000 users 
    val customerData = generateCustomerData(1 to 100000)
    // Convert to DataFrame
    val customerDF = customerData.toDF("customer_id", "customer_name", "date", 
"transaction_details")
    // Write the data to an Apache Iceberg table
    logger.info(s"partition count:  ${customerDF.rdd.getNumPartitions}")
    customerDF
      .write
      .format("iceberg")
      .mode("append")
      .save("rest.db.customers2")
    val df = spark.sql("SELECT * FROM rest.db.customers2;")
    logger.info("Count:   " + df.count())
    // Stop Spark session
    spark.stop()
  }
}


{code}
{code:java}
Here is ERROR:{code}
{code:java}
 
[error] Exception in thread "main" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 4 in stage 4.0 failed 1 times, most recent failure: 
Lost task 4.0 in stage 4.0 (TID 506) (192.168.29.234 executor driver): 
java.lang.OutOfMemoryError: Java heap space
[error]     at 
java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79)
[error]     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:158)
[error]     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
[error]     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
[error]     at 
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
[error]     at 
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360)
[error]     at 
org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:760)
[error]     at 
org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131)
[error]     at 
org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
[error]     at 
org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32)
[error]     at 
org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:108)
[error]     at 
org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47)
[error]     at 
org.apache.iceberg.io.FanoutDataWriter.newWriter(FanoutDataWriter.java:53)
[error]     at org.apache.iceberg.io.FanoutWriter.writer(FanoutWriter.java:63)
[error]     at org.apache.iceberg.io.FanoutWriter.write(FanoutWriter.java:51)
[error]     at 
org.apache.iceberg.io.FanoutDataWriter.write(FanoutDataWriter.java:31)
[error]     at 
org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:781)
[error]     at 
org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:751)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:498)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:453)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$$Lambda$3795/0x00000008017a7440.apply(Unknown
 Source)
[error]     at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$3504/0x00000008016f2040.apply(Unknown
 Source)
[error]     at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
[error]     at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:141)
[error]     at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
[error]     at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$2232/0x0000000800fa6040.apply(Unknown
 Source)
[error] Driver stacktrace:
[error]     at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
[error]     at scala.collection.immutable.List.foreach(List.scala:334)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
[error]     at scala.Option.foreach(Option.scala:437)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
[error]     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
[error]     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
[error]     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
[error]     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
[error]     at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:390)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:364)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:230)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:342)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:341)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:230)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
[error]     at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
[error]     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
[error]     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
[error]     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
[error]     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
[error]     at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
[error]     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
[error]     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
[error]     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
[error]     at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
[error]     at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
[error]     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
[error]     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
[error]     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[error]     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[error]     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
[error]     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
[error]     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
[error]     at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
[error]     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
[error]     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
[error]     at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
[error]     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
[error]     at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:315)
[error]     at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
[error]     at 
com.techmonad.spark.IcebergDataGenerator$.main(IcebergDataGenerator.scala:77)
[error]     at 
com.techmonad.spark.IcebergDataGenerator.main(IcebergDataGenerator.scala) {code}
 
{quote}
 
 

*Questions:*

 

1. Can bucketing be effectively used with Iceberg tables in Spark?

2. What could be causing the OOM issue, and are there potential workarounds?

 

Let me know if you'd like any additional details added!

 

  was:
While attempting to ingest data into an Iceberg table on S3 using a Spark batch 
job, the process fails with an OOM error. Initial investigation suggests that 
the use of bucketing as a partitioning strategy may be the cause. When 
bucketing is removed, the code runs successfully.

 

Here is the current Spark code being used:
{quote} 
{code:java}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
import java.sql.Date
import java.time.LocalDate
import scala.util.Random
object IcebergDataGenerator {
  def main(args: Array[String]): Unit = {
    val logger: Logger = LoggerFactory.getLogger(this.getClass)
    val spark =
      SparkSession
        .builder()
        .appName("Iceberg Data Generator")
        .master("local[*]")
        .config("spark.driver.memory", "16g")
        .config("spark.executor.memory", "16g")
        .config("spark.driver.maxResultSize", "2g")
        .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.rest", 
"org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.rest.type", "rest")
        .config("spark.sql.catalog.rest.uri", "http://127.0.0.1:9001/iceberg/";)
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.shuffle.partitions", "1000")
        .config("spark.default.parallelism", "32")
        .getOrCreate()
    import spark.implicits._
    spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;")
    spark.sql(
      """
        |CREATE TABLE IF NOT EXISTS rest.db.customers2 (
        |  customer_id INT,
        |  customer_name STRING,
        |  date DATE,
        |  transaction_details STRING
        |) USING iceberg
        |PARTITIONED BY (bucket(1000, customer_id), days(date))
  """.stripMargin)
    // generate the data 
    def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date, 
String)] = {
      val random = new Random()
      numbers.map { i =>
        val customerId = i
        val customerName = s"Customer_$i"
        val date = Date.valueOf(LocalDate.now().minusDays(random.nextInt(180))) 
// Random date within the last 6 months
        val transactionDetails = s"Transaction details for customer $i"
        (customerId, customerName, date, transactionDetails)
      }
    }
    // Generate  1,00,000 users 
    val customerData = generateCustomerData(1 to 100000)
    // Convert to DataFrame
    val customerDF = customerData.toDF("customer_id", "customer_name", "date", 
"transaction_details")
    // Write the data to an Apache Iceberg table
    logger.info(s"partition count:  ${customerDF.rdd.getNumPartitions}")
    customerDF
      .write
      .format("iceberg")
      .mode("append")
      .save("rest.db.customers2")
    val df = spark.sql("SELECT * FROM rest.db.customers2;")
    logger.info("Count:   " + df.count())
    // Stop Spark session
    spark.stop()
  }
}
 
{code}
 
{quote}
 
 

*Questions:*

 

 1. Can bucketing be effectively used with Iceberg tables in Spark?

 2. What could be causing the OOM issue, and are there potential workarounds?

 

Let me know if you'd like any additional details added!

 


> Data ingestion into the Iceberg table (S3 bucket) via a Spark batch job is 
> failing due to an Out of Memory.
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-50203
>                 URL: https://issues.apache.org/jira/browse/SPARK-50203
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 3.5.3
>         Environment: Running on MacOS + 48GB Memory + 16Cores + M3
>            Reporter: Satendra Kumar
>            Priority: Major
>             Fix For: 4.0.0
>
>
> While attempting to ingest data into an Iceberg table on S3 using a Spark 
> batch job, the process fails with an OOM error. Initial investigation 
> suggests that the use of bucketing as a partitioning strategy may be the 
> cause. When bucketing is removed, the code runs successfully.
>  
> Here is the current Spark code being used:
> {quote} 
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.slf4j.{Logger, LoggerFactory}
> import java.sql.Date
> import java.time.LocalDate
> import scala.util.Random
> object IcebergDataGenerator {
>   def main(args: Array[String]): Unit = {
>     val logger: Logger = LoggerFactory.getLogger(this.getClass)
>     val spark =
>       SparkSession
>         .builder()
>         .appName("Iceberg Data Generator")
>         .master("local[*]")
>         .config("spark.driver.memory", "16g")
>         .config("spark.executor.memory", "16g")
>         .config("spark.driver.maxResultSize", "2g")
>         .config("spark.sql.extensions", 
> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
>         .config("spark.sql.catalog.rest", 
> "org.apache.iceberg.spark.SparkCatalog")
>         .config("spark.sql.catalog.rest.type", "rest")
>         .config("spark.sql.catalog.rest.uri", 
> "http://127.0.0.1:9001/iceberg/";)
>         .config("spark.sql.adaptive.enabled", "true")
>         .config("spark.sql.shuffle.partitions", "1000")
>         .config("spark.default.parallelism", "32")
>         .getOrCreate()
>     import spark.implicits._
>     spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;")
>     spark.sql(
>       """
>         |CREATE TABLE IF NOT EXISTS rest.db.customers2 (
>         |  customer_id INT,
>         |  customer_name STRING,
>         |  date DATE,
>         |  transaction_details STRING
>         |) USING iceberg
>         |PARTITIONED BY (bucket(1000, customer_id), days(date))
>   """.stripMargin)
>     // generate the data 
>     def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date, 
> String)] = {
>       val random = new Random()
>       numbers.map { i =>
>         val customerId = i
>         val customerName = s"Customer_$i"
>         val date = 
> Date.valueOf(LocalDate.now().minusDays(random.nextInt(180))) // Random date 
> within the last 6 months
>         val transactionDetails = s"Transaction details for customer $i"
>         (customerId, customerName, date, transactionDetails)
>       }
>     }
>     // Generate  1,00,000 users 
>     val customerData = generateCustomerData(1 to 100000)
>     // Convert to DataFrame
>     val customerDF = customerData.toDF("customer_id", "customer_name", 
> "date", "transaction_details")
>     // Write the data to an Apache Iceberg table
>     logger.info(s"partition count:  ${customerDF.rdd.getNumPartitions}")
>     customerDF
>       .write
>       .format("iceberg")
>       .mode("append")
>       .save("rest.db.customers2")
>     val df = spark.sql("SELECT * FROM rest.db.customers2;")
>     logger.info("Count:   " + df.count())
>     // Stop Spark session
>     spark.stop()
>   }
> }
> {code}
> {code:java}
> Here is ERROR:{code}
> {code:java}
>  
> [error] Exception in thread "main" org.apache.spark.SparkException: Job 
> aborted due to stage failure: Task 4 in stage 4.0 failed 1 times, most recent 
> failure: Lost task 4.0 in stage 4.0 (TID 506) (192.168.29.234 executor 
> driver): java.lang.OutOfMemoryError: Java heap space
> [error]     at 
> java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79)
> [error]     at 
> org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:158)
> [error]     at 
> org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
> [error]     at 
> org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
> [error]     at 
> org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
> [error]     at 
> org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360)
> [error]     at 
> org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:760)
> [error]     at 
> org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131)
> [error]     at 
> org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
> [error]     at 
> org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32)
> [error]     at 
> org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:108)
> [error]     at 
> org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47)
> [error]     at 
> org.apache.iceberg.io.FanoutDataWriter.newWriter(FanoutDataWriter.java:53)
> [error]     at org.apache.iceberg.io.FanoutWriter.writer(FanoutWriter.java:63)
> [error]     at org.apache.iceberg.io.FanoutWriter.write(FanoutWriter.java:51)
> [error]     at 
> org.apache.iceberg.io.FanoutDataWriter.write(FanoutDataWriter.java:31)
> [error]     at 
> org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:781)
> [error]     at 
> org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:751)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:498)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:453)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$$Lambda$3795/0x00000008017a7440.apply(Unknown
>  Source)
> [error]     at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$3504/0x00000008016f2040.apply(Unknown
>  Source)
> [error]     at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
> [error]     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
> [error]     at org.apache.spark.scheduler.Task.run(Task.scala:141)
> [error]     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
> [error]     at 
> org.apache.spark.executor.Executor$TaskRunner$$Lambda$2232/0x0000000800fa6040.apply(Unknown
>  Source)
> [error] Driver stacktrace:
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
> [error]     at scala.collection.immutable.List.foreach(List.scala:334)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
> [error]     at scala.Option.foreach(Option.scala:437)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
> [error]     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
> [error]     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
> [error]     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
> [error]     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> [error]     at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
> [error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:390)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:364)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:230)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:342)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:341)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:230)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
> [error]     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
> [error]     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
> [error]     at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
> [error]     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
> [error]     at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
> [error]     at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
> [error]     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
> [error]     at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
> [error]     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
> [error]     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
> [error]     at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> [error]     at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> [error]     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
> [error]     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
> [error]     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
> [error]     at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
> [error]     at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
> [error]     at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:315)
> [error]     at 
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
> [error]     at 
> com.techmonad.spark.IcebergDataGenerator$.main(IcebergDataGenerator.scala:77)
> [error]     at 
> com.techmonad.spark.IcebergDataGenerator.main(IcebergDataGenerator.scala) 
> {code}
>  
> {quote}
>  
>  
> *Questions:*
>  
> 1. Can bucketing be effectively used with Iceberg tables in Spark?
> 2. What could be causing the OOM issue, and are there potential workarounds?
>  
> Let me know if you'd like any additional details added!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to