This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 90bc329  Hive: fix issue of inserting empty data on Tez (#2516)
90bc329 is described below

commit 90bc3297fbde62150281698ad5b61dc0dfb2be9c
Author: Marton Bod <[email protected]>
AuthorDate: Wed Apr 28 09:45:09 2021 +0200

    Hive: fix issue of inserting empty data on Tez (#2516)
---
 .../mr/hive/HiveIcebergOutputCommitter.java        | 29 ++++++++++++++++------
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 16 ++++++++++++
 2 files changed, 38 insertions(+), 7 deletions(-)

diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index a7d7891..a6ea8e2 100644
--- 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -25,6 +25,7 @@ import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
@@ -92,8 +94,12 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
 
     TaskAttemptID attemptID = context.getTaskAttemptID();
     JobConf jobConf = context.getJobConf();
-    Map<String, HiveIcebergRecordWriter> writers = 
HiveIcebergRecordWriter.getWriters(attemptID);
     Collection<String> outputs = 
HiveIcebergStorageHandler.outputTables(context.getJobConf());
+    Map<String, HiveIcebergRecordWriter> writers = 
Optional.ofNullable(HiveIcebergRecordWriter.getWriters(attemptID))
+        .orElseGet(() -> {
+          LOG.info("CommitTask found no writers for output tables: {}, 
attemptID: {}", outputs, attemptID);
+          return ImmutableMap.of();
+        });
 
     ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
     try {
@@ -107,11 +113,16 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
             Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
             if (table != null) {
               HiveIcebergRecordWriter writer = writers.get(output);
-              DataFile[] closedFiles = writer != null ? writer.dataFiles() : 
new DataFile[0];
+              DataFile[] closedFiles;
+              if (writer != null) {
+                closedFiles = writer.dataFiles();
+              } else {
+                LOG.info("CommitTask found no writer for specific table: {}, 
attemptID: {}", output, attemptID);
+                closedFiles = new DataFile[0];
+              }
+                // Creating the file containing the data files generated by 
this task for this table
               String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
                       attemptID.getJobID(), attemptID.getTaskID().getId());
-
-              // Creating the file containing the data files generated by this 
task for this table
               createFileForCommit(closedFiles, fileForCommitLocation, 
table.io());
             } else {
               // When using Tez multi-table inserts, we could have more output 
tables in config than
@@ -176,9 +187,13 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           .executeWith(tableExecutor)
           .run(output -> {
             Table table = HiveIcebergStorageHandler.table(jobConf, output);
-            String catalogName = 
HiveIcebergStorageHandler.catalogName(jobConf, output);
-            jobLocations.add(generateJobLocation(table.location(), jobConf, 
jobContext.getJobID()));
-            commitTable(table.io(), fileExecutor, jobContext, output, 
table.location(), catalogName);
+            if (table != null) {
+              String catalogName = 
HiveIcebergStorageHandler.catalogName(jobConf, output);
+              jobLocations.add(generateJobLocation(table.location(), jobConf, 
jobContext.getJobID()));
+              commitTable(table.io(), fileExecutor, jobContext, output, 
table.location(), catalogName);
+            } else {
+              LOG.info("CommitJob found no serialized table in config for 
table: {}. Skipping job commit.", output);
+            }
           });
     } finally {
       fileExecutor.shutdown();
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 972fcf5..48bd181 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -705,6 +705,22 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     Assert.assertEquals("Linda", results.get(0)[1]);
   }
 
+  @Test
+  public void testInsertEmptyResultSet() throws IOException {
+    Table source = testTables.createTable(shell, "source", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            fileFormat, ImmutableList.of());
+    Table target = testTables.createTable(shell, "target", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            fileFormat, ImmutableList.of());
+
+    shell.executeStatement("INSERT INTO target SELECT * FROM source");
+    HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+
+    testTables.appendIcebergTable(shell.getHiveConf(), source, fileFormat, 
null,
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    shell.executeStatement("INSERT INTO target SELECT * FROM source WHERE 
first_name = 'Nobody'");
+    HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+  }
+
   private void testComplexTypeWrite(Schema schema, List<Record> records) 
throws IOException {
     String tableName = "complex_table";
     Table table = testTables.createTable(shell, "complex_table", schema, 
fileFormat, ImmutableList.of());

Reply via email to