suisenkotoba commented on issue #5424:
URL: https://github.com/apache/iceberg/issues/5424#issuecomment-1451638558

   anyone has any luck with this? I faced the same issue, I could not find what 
caused this. 
   I am not sure if there's something wrong with my spark configuration. I use 
spark  3.3 on dataproc ([image version 
2.1](https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1))
 with iceberg 1.1.0. The dataproc cluster already had dataproc metastore 
attached. 
   This is my python code:
   ```python
   from pyspark.sql import SparkSession
   from pyspark import SparkConf
   from pyspark.sql.types import LongType, BooleanType, StructField, StructType
   
   
   conf = (
           SparkConf()
           .setAppName('test_iceberg')
           .set('spark.sql.extensions', 
'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
           .set('spark.sql.catalog.spark_catalog', 
'org.apache.iceberg.spark.SparkSessionCatalog')
           .set('spark.sql.catalog.spark_catalog.type', 'hive')
           .set(f'spark.sql.catalog.dev', 
'org.apache.iceberg.spark.SparkCatalog')
           .set(f'spark.sql.catalog.dev.type', 'hive')
       )
   spark = 
SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
   
   df = spark.read.option("mergeSchema", 
"true").parquet('gs://poc-kafka/mis').sort("timestamp")
   
   # Create iceberg table 
   schema = df.select("value.after.*").schema
   additional_fields = [StructField('start_at', LongType(), True), 
                  StructField('end_at', LongType(), True), 
                  StructField('is_current', BooleanType(), True)]
   schema = StructType(schema.fields + additional_fields)
   empty_df = spark.createDataFrame([], schema)
   spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy")
   empty_df.createOrReplaceTempView("empty")
   spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg  "
            "TBLPROPERTIES ('format-version'='2') "
            "AS (SELECT * FROM empty) ")
   
   # Merge table
   df.createOrReplaceTempView("changelog")
   sql = "MERGE INTO dev.dummy.fruit d " \
         "USING (SELECT value.after.*, value.op op, " \
         "            value.source.ts_ms start_at, True is_current, " \
         "            NULL end_at " \
         "        FROM changelog) s " \
         "ON d.id = s.id " \
         "WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = 
s.start_at " \
         "WHEN NOT MATCHED THEN INSERT * "
   spark.sql(sql)
   
   ```
   
   This is the traceback I got:
   ```
   /usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
      1032             sqlQuery = formatter.format(sqlQuery, **kwargs)
      1033         try:
   -> 1034             return DataFrame(self._jsparkSession.sql(sqlQuery), self)
      1035         finally:
      1036             if len(kwargs) > 0:
   
   /usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in 
__call__(self, *args)
      1319 
      1320         answer = self.gateway_client.send_command(command)
   -> 1321         return_value = get_return_value(
      1322             answer, self.gateway_client, self.target_id, self.name)
      1323 
   
   /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
       188     def deco(*a: Any, **kw: Any) -> Any:
       189         try:
   --> 190             return f(*a, **kw)
       191         except Py4JJavaError as e:
       192             converted = convert_exception(e.java_exception)
   
   /usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
       324             value = OUTPUT_CONVERTER[type](answer[2:], 
gateway_client)
       325             if answer[1] == REFERENCE_TYPE:
   --> 326                 raise Py4JJavaError(
       327                     "An error occurred while calling {0}{1}{2}.\n".
       328                     format(target_id, ".", name), value)
   
   Py4JJavaError: An error occurred while calling o67.sql.
   : java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported 
temporarily.
       at 
org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
       at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:821)
       at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
       at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
       at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
       at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
       at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
       at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
       at scala.collection.Iterator.foreach(Iterator.scala:943)
       at scala.collection.Iterator.foreach$(Iterator.scala:943)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
       at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
       at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
       at 
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
       at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
       at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
       at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
       at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
       at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
       at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
       at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
       at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
       at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
       at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
       at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
       at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
       at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
       at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
       at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
       at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
       at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
       at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
       at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
       at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
       at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
       at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
       at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
       at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
       at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
       at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
       at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
       at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
       at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
       at py4j.Gateway.invoke(Gateway.java:282)
       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
       at py4j.commands.CallCommand.execute(CallCommand.java:79)
       at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
       at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   If it helps, this is the dataproc cluster properties I included when 
creating the cluster:
   
`--properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to