mohitgargk opened a new issue, #5739:
URL: https://github.com/apache/iceberg/issues/5739
spark 3.1.2 + iceberg 0.14.1 has been running successfully.
spark 3.2.0 + iceberg 0.14.1 results in following error
**Command**
`./bin/spark-shell --packages
org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
--conf spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse;
`
**Exception**
> org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceData
RelationV2[id#219, firstname#220, lastname#221, age#222, date_id#223]
local.db.target;
> 'MergeIntoIcebergTable (cast(id#219 as bigint) = id#224L),
[deleteaction(Some(((operation_type#230 = DELETE) AND (isnull(date_id#223) OR
(arrival_time#229 > date_id#223))))), updateaction(Some((((operation_type#230 =
UPSERT) OR (operation_type#230 = APPEND)) AND (isnull(date_id#223) OR
(arrival_time#229 > date_id#223)))), assignment(id#219, ansi_cast(id#224L as
string)), assignment(firstname#220, firstname#225), assignment(lastname#221,
lastname#226), assignment(age#222, age#227), assignment(date_id#223,
date_id#228))], [insertaction(Some(NOT (operation_type#230 = DELETE)),
assignment(id#219, ansi_cast(id#224L as string)), assignment(firstname#220,
firstname#225), assignment(lastname#221, lastname#226), assignment(age#222,
age#227), assignment(date_id#223, date_id#228))]
> :- SubqueryAlias target
> : +- SubqueryAlias local.db.target
> : +- RelationV2[id#219, firstname#220, lastname#221, age#222,
date_id#223] local.db.target
> :- SubqueryAlias source
> : +- SubqueryAlias local.db.source
> : +- RelationV2[id#224L, firstname#225, lastname#226, age#227,
date_id#228, arrival_time#229, operation_type#230] local.db.source
> +- 'ReplaceData
> +- MergeRows[id#219, firstname#220, lastname#221, age#222, date_id#223,
_file#233]
> +- Join FullOuter, (cast(id#219 as bigint) = id#244L),
leftHint=(strategy=no_broadcast_hash)
> :- NoStatsUnaryNode
> : +- Project [id#219, firstname#220, lastname#221, age#222,
date_id#223, _file#233, true AS __row_from_target#236,
monotonically_increasing_id() AS __row_id#237L]
> : +- RelationV2[id#219, firstname#220, lastname#221, age#222,
date_id#223, _file#233] local.db.target
> +- Project [id#244L, firstname#245, lastname#246, age#247,
date_id#248, arrival_time#249, operation_type#250, true AS
__row_from_source#238]
> +- SubqueryAlias source
> +- SubqueryAlias local.db.source
> +- RelationV2[id#244L, firstname#245, lastname#246,
age#247, date_id#248, arrival_time#249, operation_type#250] local.db.source
**spark-shell code**
```
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.iceberg.{PartitionSpec, Schema, Table}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
import org.apache.iceberg.TableProperties
import spark.implicits._
import java.sql.Timestamp
// Utility
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean)
: DataFrame = {
val schema = df.schema
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t,
nullable = nullable, m)
case y: StructField => y
})
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
// 1. Prepare target data
spark.sql("drop table local.db.target")
spark.sql("CREATE TABLE local.db.target (id string not null, firstname
string, lastname string, age int, date_id timestamp) USING iceberg");
val namespace = "db"
val target = "target"
val targetName = TableIdentifier.of(namespace, target)
val catalog = new HadoopCatalog(spark.sparkContext.hadoopConfiguration,
"file:///Users/mohit.garg/spark_iceberg/spark-3.2.0-bin-hadoop3.2/warehouse")
val targetTable = catalog.loadTable(targetName)
targetTable.updateProperties().set(TableProperties.FORMAT_VERSION,
"2").commit();
val ts = Timestamp.valueOf("2022-01-01 00:00:00");
val range = (1 to 10).toList
val targetData = range.map(id => (id.toString, "", "", 0, ts) )
var targetDf = spark.createDataFrame(targetData).toDF("id", "firstname",
"lastname", "age", "date_id")
val newTargetDf = setNullableStateOfColumn(targetDf, "id", false)
newTargetDf.registerTempTable("targetDf")
spark.sql("INSERT INTO local.db.target SELECT * from targetDf")
spark.sql("select * from local.db.target").show
// 2. Prepare source data
spark.sql("drop table local.db.source")
spark.sql("CREATE TABLE local.db.source (id bigint not null, firstname
string, lastname string, age int, date_id timestamp, arrival_time timestamp,
operation_type string) USING iceberg");
val source = "source"
val sourceName = TableIdentifier.of(namespace, source)
val sourceTable = catalog.loadTable(sourceName)
sourceTable.updateProperties().set(TableProperties.FORMAT_VERSION,
"2").commit();
val sourceData = Seq(
(1, "mohit", "garg", 1, Timestamp.valueOf("2022-01-01 00:00:01"),
Timestamp.valueOf("2022-01-01 00:00:01"), "UPSERT" ),
(2, "adam", "hancock", 1, Timestamp.valueOf("2022-01-01 00:00:01"),
Timestamp.valueOf("2022-01-01 00:00:01"), "APPEND" ),
(3, "pradeep", "venkat", 1, Timestamp.valueOf("2022-01-01 00:00:01"),
Timestamp.valueOf("2022-01-01 00:00:01"), "DELETE" ) ) ;
var sourceDf = spark.createDataFrame(sourceData).toDF("id", "firstname",
"lastname", "age", "date_id", "arrival_time", "operation_type")
sourceDf.registerTempTable("sourceDf")
val newSourceDf = setNullableStateOfColumn(sourceDf, "id", false)
newSourceDf.registerTempTable("sourceDf")
spark.sql("INSERT INTO local.db.source SELECT * from sourceDf")
spark.sql("select * from local.db.source").show
// 3. MergeInto
spark.sql("""MERGE INTO local.db.target as target
USING local.db.source as source ON target.id = source.id
WHEN MATCHED AND source.operation_type = 'DELETE' AND (target.date_id IS
NULL OR source.arrival_time > target.date_id)
THEN DELETE
WHEN MATCHED AND (source.operation_type = 'UPSERT' OR
source.operation_type = 'APPEND') AND (target.date_id IS NULL OR
source.arrival_time > target.date_id)
THEN UPDATE SET target.id = source.id, target.firstname =
source.firstname, target.lastname = source.lastname, target.age = source.age,
target.date_id = source.date_id
WHEN NOT MATCHED AND source.operation_type != 'DELETE'
THEN INSERT (`id`, `firstname`, `lastname`, `age`, `date_id`) VALUES
(source.`id`, source.`firstname`, source.`lastname`, source.`age`,
source.`date_id`)""")
spark.sql("select * from local.db.target").show
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]