a-agmon opened a new issue, #7021:
URL: https://github.com/apache/iceberg/issues/7021
### Apache Iceberg version
1.1.0 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
We have a usecase in which we read some data from a few sources, transform
it, and then merge it into an iceberg table.
We use Spark's observe API in order to track metrics on the data we read.
The metrics are usually calculated on the executers when a compute is
triggered, such as when a Merge statement is executed.
When the table does not exists and the merge is essentially a creation of a
new table then everything works as expected and the observed metrics is
reported as expected. However, when there is data in the table and the merge is
an update then the process fails with the following exception:
```
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple
definitions of observed metrics named 'rowsObs': MergeIntoIcebergTable
.....
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:56)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:55)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:187)
....
```
Below you can see a full reproduction of the issue :
Table is read -> Table is transformed -> observation metric added -> merge
is executed
```scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.sql._
object ObservationBug {
def getIcebergConfig(icebergCat: String, icebergS3WH: String): SparkConf =
{
val currentPath = System.getProperty("user.dir")
new SparkConf()
.set(s"spark.sql.catalog.$icebergCat",
"org.apache.iceberg.spark.SparkCatalog")
.set(s"spark.sql.catalog.$icebergCat.warehouse",
s"$currentPath/$icebergS3WH")
.set(s"spark.sql.catalog.$icebergCat.type", "hadoop")
// add extension for iceberg procedures
.set("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
}
def getMasterConfig: SparkConf = {
new SparkConf()
.set("spark.master", "local[*]")
.set("spark.driver.host", "127.0.0.1")
.set("spark.driver.bindAddress", "127.0.0.1")
}
def main(args: Array[String]): Unit = {
implicit val spark: SparkSession = SparkSession.builder()
.config(getMasterConfig)
.config(getIcebergConfig("demo", "iceberg_wh"))
.appName("ObservationDemo")
.getOrCreate()
// register a listener to print the metrics
spark.listenerManager.register(new QueryExecutionListener {
override def onSuccess(funcName: String, qe:
org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
for {
(metricName, metricRow) <- qe.observedMetrics
fieldName <- metricRow.schema.fieldNames
} println(s"Metric: $metricName, fieldName: $fieldName, value:
${metricRow.getAs(fieldName)}")
}
override def onFailure(funcName: String, qe:
org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {
}
})
spark.sql(
"""
|Drop table if exists demo.nyc.taxis;
|""".stripMargin)
spark.sql(
"""
|CREATE TABLE demo.nyc.taxis
|(
| vendor_id bigint,
| trip_id bigint,
| trip_distance float,
| fare_amount double,
| store_and_fwd_flag string
|)
|PARTITIONED BY (vendor_id);
|""".stripMargin)
spark.sql(
"""
|INSERT INTO demo.nyc.taxis
|VALUES (1, 1000371, 1.8, 15.32, 'N'),
| (2, 1000372, 2.5, 22.15, 'N'),
| (2, 1000373, 0.9, 9.01, 'N'),
| (1, 1000374, 8.4, 42.13, 'Y');
|""".stripMargin)
val df = spark.sql("Select * FROM demo.nyc.taxis")
val changedDF = df.withColumn("store_and_fwd_flag", lit("X"))
val observedDF = changedDF.observe("rowsObs", functions.count("*"))
observedDF.createOrReplaceTempView("observedDF")
spark.sql(
"""
|MERGE INTO demo.nyc.taxis target
|USING (SELECT * FROM observedDF) source
|ON source.trip_id = target.trip_id AND source.vendor_id =
target.vendor_id
|WHEN MATCHED THEN
|UPDATE SET target.store_and_fwd_flag = source.store_and_fwd_flag
|""".stripMargin)
spark.sql("Select * FROM demo.nyc.taxis").show(10)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]