suisenkotoba commented on issue #5424: URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1451638558
anyone has any luck with this? I faced the same issue, I could not find what caused this. I am not sure if there's something wrong with my spark configuration. I use spark 3.3 on dataproc ([image version 2.1](https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1)) with iceberg 1.1.0. The dataproc cluster already had dataproc metastore attached. This is my python code: ```python from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark.sql.types import LongType, BooleanType, StructField, StructType conf = ( SparkConf() .setAppName('test_iceberg') .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog') .set('spark.sql.catalog.spark_catalog.type', 'hive') .set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog') .set(f'spark.sql.catalog.dev.type', 'hive') ) spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate() df = spark.read.option("mergeSchema", "true").parquet('gs://poc-kafka/mis').sort("timestamp") # Create iceberg table schema = df.select("value.after.*").schema additional_fields = [StructField('start_at', LongType(), True), StructField('end_at', LongType(), True), StructField('is_current', BooleanType(), True)] schema = StructType(schema.fields + additional_fields) empty_df = spark.createDataFrame([], schema) spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy") empty_df.createOrReplaceTempView("empty") spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg " "TBLPROPERTIES ('format-version'='2') " "AS (SELECT * FROM empty) ") # Merge table df.createOrReplaceTempView("changelog") sql = "MERGE INTO dev.dummy.fruit d " \ "USING (SELECT value.after.*, value.op op, " \ " value.source.ts_ms start_at, True is_current, " \ " NULL end_at " \ " FROM changelog) s " \ "ON d.id = s.id " \ "WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = s.start_at " \ "WHEN NOT MATCHED THEN INSERT * " spark.sql(sql) ``` This is the traceback I got: ``` /usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs) 1032 sqlQuery = formatter.format(sqlQuery, **kwargs) 1033 try: -> 1034 return DataFrame(self._jsparkSession.sql(sqlQuery), self) 1035 finally: 1036 if len(kwargs) > 0: /usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args) 1319 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1323 /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 188 def deco(*a: Any, **kw: Any) -> Any: 189 try: --> 190 return f(*a, **kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception) /usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o67.sql. : java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily. at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:821) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151) at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) ``` If it helps, this is the dataproc cluster properties I included when creating the cluster: `--properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
