This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a7ea25a72e HIVE-26133: Insert overwrite on Iceberg tables can result
in duplicate entries after partition evolution (#3202) (Laszlo Pinter, reviewed
by Marton Bod and Peter Vary)
a7ea25a72e is described below
commit a7ea25a72ec5334d3cac15f503b651de8200ff9c
Author: László Pintér <[email protected]>
AuthorDate: Thu Apr 14 15:22:42 2022 +0200
HIVE-26133: Insert overwrite on Iceberg tables can result in duplicate
entries after partition evolution (#3202) (Laszlo Pinter, reviewed by Marton
Bod and Peter Vary)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 10 ++++++++++
.../apache/iceberg/mr/hive/TestHiveIcebergInserts.java | 18 ++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index e68458eafe..4c82eb78cd 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
@@ -460,6 +461,15 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
if (IcebergTableUtil.isBucketed(table)) {
throw new SemanticException("Cannot perform insert overwrite query on
bucket partitioned Iceberg table.");
}
+ if (table.currentSnapshot() != null) {
+ if
(table.currentSnapshot().allManifests().parallelStream().map(ManifestFile::partitionSpecId)
+ .anyMatch(id -> id < table.spec().specId())) {
+ throw new SemanticException(
+ "Cannot perform insert overwrite query on Iceberg table where
partition evolution happened. In order " +
+ "to succesfully carry out any insert overwrite operation on this
table, the data has to be rewritten " +
+ "conforming to the latest spec. ");
+ }
+ }
}
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 8545447cd2..f38eea1969 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -183,6 +183,24 @@ public class TestHiveIcebergInserts extends
HiveIcebergStorageHandlerWithEngineB
testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
target, true)));
}
+ @Test
+ public void testInsertOverwriteWithPartitionEvolutionThrowsError() throws
IOException {
+ TableIdentifier target = TableIdentifier.of("default", "target");
+ Table table = testTables.createTable(shell, target.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2,
last_name))");
+ List<Record> newRecords =
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(0L, "Mike", "Taylor")
+ .add(1L, "Christy", "Hubert")
+ .build();
+ AssertHelpers.assertThrows("IOW should not work on tables with partition
evolution",
+ IllegalArgumentException.class,
+ "Cannot perform insert overwrite query on Iceberg table where
partition evolution happened.",
+ () -> shell.executeStatement(testTables.getInsertQuery(newRecords,
target, true)));
+ // TODO: we should add additional test cases after merge + compaction is
supported in hive that allows us to
+ // rewrite the data
+ }
+
/**
* Testing map-reduce inserts.
* @throws IOException If there is an underlying IOException