This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f271bd  Hive: Fix multi-table insert issue with Tez (#2502)
4f271bd is described below

commit 4f271bd3a4478ec802fe256e512635aea576cf0e
Author: Marton Bod <[email protected]>
AuthorDate: Mon Apr 26 17:00:11 2021 +0200

    Hive: Fix multi-table insert issue with Tez (#2502)
---
 .../iceberg/mr/hive/HiveIcebergOutputCommitter.java  | 20 +++++++++++++-------
 .../TestHiveIcebergStorageHandlerWithEngine.java     | 18 ++++++++++++++++--
 .../java/org/apache/iceberg/mr/hive/TestTables.java  |  9 +++++++++
 3 files changed, 38 insertions(+), 9 deletions(-)

diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index bf298ad..a7d7891 100644
--- 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -105,13 +105,19 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           .executeWith(tableExecutor)
           .run(output -> {
             Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
-            HiveIcebergRecordWriter writer = writers.get(output);
-            DataFile[] closedFiles = writer != null ? writer.dataFiles() : new 
DataFile[0];
-            String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-                attemptID.getJobID(), attemptID.getTaskID().getId());
-
-            // Creating the file containing the data files generated by this 
task for this table
-            createFileForCommit(closedFiles, fileForCommitLocation, 
table.io());
+            if (table != null) {
+              HiveIcebergRecordWriter writer = writers.get(output);
+              DataFile[] closedFiles = writer != null ? writer.dataFiles() : 
new DataFile[0];
+              String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
+                      attemptID.getJobID(), attemptID.getTaskID().getId());
+
+              // Creating the file containing the data files generated by this 
task for this table
+              createFileForCommit(closedFiles, fileForCommitLocation, 
table.io());
+            } else {
+              // When using Tez multi-table inserts, we could have more output 
tables in config than
+              // the actual tables this task has written to and has serialized 
in its config
+              LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
+            }
           }, IOException.class);
     } finally {
       if (tableExecutor != null) {
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index aca5c04..972fcf5 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -659,9 +659,23 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     Table target1 = testTables.createTable(shell, "target1", target1Schema, 
fileFormat, ImmutableList.of());
     Table target2 = testTables.createTable(shell, "target2", target2Schema, 
fileFormat, ImmutableList.of());
 
+    // simple insert: should create a single vertex writing to both target 
tables
     shell.executeStatement("FROM customers " +
-            "INSERT INTO target1 SELECT customer_id, first_name " +
-            "INSERT INTO target2 SELECT last_name, customer_id");
+        "INSERT INTO target1 SELECT customer_id, first_name " +
+        "INSERT INTO target2 SELECT last_name, customer_id");
+
+    // Check that everything is as expected
+    HiveIcebergTestUtils.validateData(target1, target1Records, 0);
+    HiveIcebergTestUtils.validateData(target2, target2Records, 1);
+
+    // truncate the target tables
+    testTables.truncateIcebergTable(target1);
+    testTables.truncateIcebergTable(target2);
+
+    // complex insert: should use a different vertex for each target table
+    shell.executeStatement("FROM customers " +
+        "INSERT INTO target1 SELECT customer_id, first_name ORDER BY 
first_name " +
+        "INSERT INTO target2 SELECT last_name, customer_id ORDER BY 
last_name");
 
     // Check that everything is as expected
     HiveIcebergTestUtils.validateData(target1, target1Records, 0);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4d3a60e..c282360 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.Tables;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveCatalog;
@@ -275,6 +276,14 @@ abstract class TestTables {
     }
   }
 
+  /**
+   * Truncates an Iceberg table.
+   * @param table The iceberg table to truncate
+   */
+  public void truncateIcebergTable(Table table) {
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+  }
+
   private static class CatalogToTables implements Tables {
 
     private final Catalog catalog;

Reply via email to