Reo-LEI commented on issue #2533:
URL: https://github.com/apache/iceberg/issues/2533#issuecomment-833595041


   I was encounter the same error when I use MERGE INTO clause. And I found if 
the target table not match any data, the IllegalArgumentException will be throw.
   
   For example, I have some log in source table, and I need to aggregate logs 
into events and write to destination table.
   
   src(user_logs) table:
   |log_time|log_type|user_name|
   |---|---|---|
   |2021-05-06 12:01:00|heartbeat|Bob
   |2021-05-06 12:03:00|heartbeat|Bob
   |2021-05-06 12:05:00|heartbeat|Bob
   |2021-05-06 12:05:00|login|John
   |2021-05-06 12:05:30|heartbeat|John
   |2021-05-06 12:06:00|heartbeat|John
   |2021-05-06 12:06:30|logout|John
   
   dst(user_events) table:
   |start_time|end_time|user_name|
   |---|---|---|
   |2021-05-06 12:01:00|2021-05-06 12:03:00|Bob|
   
   
   If I execute the following SQL, the error will happend.
   ```
   MERGE INTO iceberg.iceberg_db.user_events AS dst
   USING(
       SELECT MIN(log_time) AS start_time,
              MAX(log_time) AS end_time,
              user_name
       FROM iceberg.iceberg_db.user_logs
       WHERE log_time >= '2021-05-06 12:05:00'
         AND log_time < '2021-05-06 12:10:00'
       GROUP BY user_name
   ) AS src
   ON  src.user_name = dst.user_name
   AND src.start_time = dst.end_time
   WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
   WHEN NOT MATCHED THEN INSERT *;
   ```
   And the stack as follow:
   ```
   java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions: List(1, 0)
        at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:366)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
        at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
        at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
        at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions: List(1, 0)
        at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:366)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
        at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
        at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
        at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   ```
   
   But if I change the log_time range to (2021-05-06 12:03:00, 2021-05-06 
12:10:00), the SQL will be success.
   
    
   I guest it's because dst table contain `user_name = 'Bob'` AND `end_time = 
'2021-05-06 12:03:00'` record, that is match the `src.user_name = dst.user_name 
AND src.start_time = dst.end_time` condition and make dst table is not empty.
   
   
   My env is spark == 3.0.2 and Iceberg == master. We can reproduce this by 
execute the sql as below:
   
   ```
   CREATE TABLE iceberg.iceberg_db.user_logs (
       log_time STRING,
       log_type STRING,
       user_name STRING
   ) USING iceberg
   TBLPROPERTIES (engine.hive.enabled='true');
   
   INSERT INTO iceberg.iceberg_db.user_logs
   VALUES ('2021-05-06 12:01:00', 'heartbeat', 'Bob'),
          ('2021-05-06 12:03:00', 'heartbeat', 'Bob'),
          ('2021-05-06 12:05:00', 'heartbeat', 'Bob'),
          ('2021-05-06 12:05:00', 'login', 'John'),
          ('2021-05-06 12:05:30', 'heartbeat', 'John'),
          ('2021-05-06 12:06:00', 'heartbeat', 'John'),
          ('2021-05-06 12:06:30', 'logout', 'John');
   
   
   CREATE TABLE iceberg.iceberg_db.user_events (
       start_time STRING,
       end_time STRING,
       user_name STRING
   ) USING iceberg
   TBLPROPERTIES (engine.hive.enabled='true');
   
   INSERT INTO iceberg.iceberg_db.user_events
   VALUES ('2021-05-06 12:01:00', '2021-05-06 12:03:00', 'Bob');
   
   
   MERGE INTO iceberg.iceberg_db.user_events AS dst
   USING(
       SELECT MIN(log_time) AS start_time,
              MAX(log_time) AS end_time,
              user_name
       FROM iceberg.iceberg_db.user_logs
       WHERE log_time >= '2021-05-06 12:05:00'
         AND log_time < '2021-05-06 12:10:00'
       GROUP BY user_name
   ) AS src
   ON  src.user_name = dst.user_name
   AND src.start_time = dst.end_time
   WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
   WHEN NOT MATCHED THEN INSERT *;
   
   
   MERGE INTO iceberg.iceberg_db.user_events AS dst
   USING(
       SELECT MIN(log_time) AS start_time,
              MAX(log_time) AS end_time,
              user_name
       FROM iceberg.iceberg_db.user_logs
       WHERE log_time >= '2021-05-06 12:03:00'
         AND log_time < '2021-05-06 12:10:00'
       GROUP BY user_name
   ) AS src
   ON  src.user_name = dst.user_name
   AND src.start_time = dst.end_time
   WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
   WHEN NOT MATCHED THEN INSERT *;
   ```
   
   Experiment result:
   
![image](https://user-images.githubusercontent.com/17312872/117320598-f9a08d00-aebe-11eb-8099-402de5d5c8e8.png)
   
![image](https://user-images.githubusercontent.com/17312872/117320668-09b86c80-aebf-11eb-9f83-18e48573561d.png)
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to