Reo-LEI commented on issue #2533:
URL: https://github.com/apache/iceberg/issues/2533#issuecomment-833595041
I was encounter the same error when I use MERGE INTO clause. And I found if
the target table not match any data, the IllegalArgumentException will be throw.
For example, I have some log in source table, and I need to aggregate logs
into events and write to destination table.
src(user_logs) table:
|log_time|log_type|user_name|
|---|---|---|
|2021-05-06 12:01:00|heartbeat|Bob
|2021-05-06 12:03:00|heartbeat|Bob
|2021-05-06 12:05:00|heartbeat|Bob
|2021-05-06 12:05:00|login|John
|2021-05-06 12:05:30|heartbeat|John
|2021-05-06 12:06:00|heartbeat|John
|2021-05-06 12:06:30|logout|John
dst(user_events) table:
|start_time|end_time|user_name|
|---|---|---|
|2021-05-06 12:01:00|2021-05-06 12:03:00|Bob|
If I execute the following SQL, the error will happend.
```
MERGE INTO iceberg.iceberg_db.user_events AS dst
USING(
SELECT MIN(log_time) AS start_time,
MAX(log_time) AS end_time,
user_name
FROM iceberg.iceberg_db.user_logs
WHERE log_time >= '2021-05-06 12:05:00'
AND log_time < '2021-05-06 12:10:00'
GROUP BY user_name
) AS src
ON src.user_name = dst.user_name
AND src.start_time = dst.end_time
WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
WHEN NOT MATCHED THEN INSERT *;
```
And the stack as follow:
```
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of
partitions: List(1, 0)
at
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:366)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
at
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
at
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of
partitions: List(1, 0)
at
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:366)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
at
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
at
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
But if I change the log_time range to (2021-05-06 12:03:00, 2021-05-06
12:10:00), the SQL will be success.
I guest it's because dst table contain `user_name = 'Bob'` AND `end_time =
'2021-05-06 12:03:00'` record, that is match the `src.user_name = dst.user_name
AND src.start_time = dst.end_time` condition and make dst table is not empty.
My env is spark == 3.0.2 and Iceberg == master. We can reproduce this by
execute the sql as below:
```
CREATE TABLE iceberg.iceberg_db.user_logs (
log_time STRING,
log_type STRING,
user_name STRING
) USING iceberg
TBLPROPERTIES (engine.hive.enabled='true');
INSERT INTO iceberg.iceberg_db.user_logs
VALUES ('2021-05-06 12:01:00', 'heartbeat', 'Bob'),
('2021-05-06 12:03:00', 'heartbeat', 'Bob'),
('2021-05-06 12:05:00', 'heartbeat', 'Bob'),
('2021-05-06 12:05:00', 'login', 'John'),
('2021-05-06 12:05:30', 'heartbeat', 'John'),
('2021-05-06 12:06:00', 'heartbeat', 'John'),
('2021-05-06 12:06:30', 'logout', 'John');
CREATE TABLE iceberg.iceberg_db.user_events (
start_time STRING,
end_time STRING,
user_name STRING
) USING iceberg
TBLPROPERTIES (engine.hive.enabled='true');
INSERT INTO iceberg.iceberg_db.user_events
VALUES ('2021-05-06 12:01:00', '2021-05-06 12:03:00', 'Bob');
MERGE INTO iceberg.iceberg_db.user_events AS dst
USING(
SELECT MIN(log_time) AS start_time,
MAX(log_time) AS end_time,
user_name
FROM iceberg.iceberg_db.user_logs
WHERE log_time >= '2021-05-06 12:05:00'
AND log_time < '2021-05-06 12:10:00'
GROUP BY user_name
) AS src
ON src.user_name = dst.user_name
AND src.start_time = dst.end_time
WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO iceberg.iceberg_db.user_events AS dst
USING(
SELECT MIN(log_time) AS start_time,
MAX(log_time) AS end_time,
user_name
FROM iceberg.iceberg_db.user_logs
WHERE log_time >= '2021-05-06 12:03:00'
AND log_time < '2021-05-06 12:10:00'
GROUP BY user_name
) AS src
ON src.user_name = dst.user_name
AND src.start_time = dst.end_time
WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
WHEN NOT MATCHED THEN INSERT *;
```
Experiment result:


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]