a-agmon opened a new issue, #7021:
URL: https://github.com/apache/iceberg/issues/7021

   ### Apache Iceberg version
   
   1.1.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   We have a usecase in which we read some data from a few sources, transform 
it, and then merge it into an iceberg table. 
   We use Spark's observe API in order to track metrics on the data we read. 
   The metrics are usually calculated on the executers when a compute is 
triggered, such as when a Merge statement is executed. 
   When the table does not exists and the merge is essentially a creation of a 
new table then everything works as expected and the observed metrics is 
reported as expected. However, when there is data in the table and the merge is 
an update  then the process fails with the following exception:
   ```
   Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple 
definitions of observed metrics named 'rowsObs': MergeIntoIcebergTable 
   .....
   
   at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:56)
        at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:55)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:187)
   ....
   ```
   
   Below you can see a full reproduction of the issue :
    Table is read ->  Table is transformed -> observation metric added -> merge 
is executed
   
   
   ```scala
   
   import org.apache.spark.SparkConf
   import org.apache.spark.sql.functions.lit
   import org.apache.spark.sql.util.QueryExecutionListener
   import org.apache.spark.sql._
   
   
   object ObservationBug {
   
     def getIcebergConfig(icebergCat: String, icebergS3WH: String): SparkConf = 
{
       val currentPath = System.getProperty("user.dir")
       new SparkConf()
         .set(s"spark.sql.catalog.$icebergCat", 
"org.apache.iceberg.spark.SparkCatalog")
         .set(s"spark.sql.catalog.$icebergCat.warehouse", 
s"$currentPath/$icebergS3WH")
         .set(s"spark.sql.catalog.$icebergCat.type", "hadoop")
         // add extension for iceberg procedures
         .set("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
     }
   
     def getMasterConfig: SparkConf = {
           new SparkConf()
             .set("spark.master", "local[*]")
             .set("spark.driver.host", "127.0.0.1")
             .set("spark.driver.bindAddress", "127.0.0.1")
     }
   
     def main(args: Array[String]): Unit = {
       implicit val spark: SparkSession = SparkSession.builder()
         .config(getMasterConfig)
         .config(getIcebergConfig("demo", "iceberg_wh"))
         .appName("ObservationDemo")
         .getOrCreate()
       
       // register a listener to print the metrics
       spark.listenerManager.register(new QueryExecutionListener {
         override def onSuccess(funcName: String, qe: 
org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
           for {
             (metricName, metricRow) <- qe.observedMetrics
             fieldName <- metricRow.schema.fieldNames
           } println(s"Metric: $metricName, fieldName: $fieldName, value: 
${metricRow.getAs(fieldName)}")
         }
         override def onFailure(funcName: String, qe: 
org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {
         }
       })
   
       spark.sql(
         """
           |Drop table if exists demo.nyc.taxis;
           |""".stripMargin)
   
       spark.sql(
         """
           |CREATE TABLE  demo.nyc.taxis
           |(
           |  vendor_id bigint,
           |  trip_id bigint,
           |  trip_distance float,
           |  fare_amount double,
           |  store_and_fwd_flag string
           |)
           |PARTITIONED BY (vendor_id);
           |""".stripMargin)
   
       spark.sql(
         """
           |INSERT INTO demo.nyc.taxis
           |VALUES (1, 1000371, 1.8, 15.32, 'N'),
           | (2, 1000372, 2.5, 22.15, 'N'),
           | (2, 1000373, 0.9, 9.01, 'N'),
           | (1, 1000374, 8.4, 42.13, 'Y');
           |""".stripMargin)
   
       val df = spark.sql("Select * FROM demo.nyc.taxis")
       val changedDF = df.withColumn("store_and_fwd_flag", lit("X"))
       val observedDF = changedDF.observe("rowsObs", functions.count("*"))
       observedDF.createOrReplaceTempView("observedDF")
       spark.sql(
         """
           |MERGE INTO demo.nyc.taxis target
           |USING (SELECT * FROM observedDF) source
           |ON source.trip_id = target.trip_id AND source.vendor_id = 
target.vendor_id
           |WHEN MATCHED THEN
           |UPDATE SET target.store_and_fwd_flag = source.store_and_fwd_flag
           |""".stripMargin)
   
       spark.sql("Select * FROM demo.nyc.taxis").show(10)
   
     }
   
   
     }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to