This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 90bc329 Hive: fix issue of inserting empty data on Tez (#2516)
90bc329 is described below
commit 90bc3297fbde62150281698ad5b61dc0dfb2be9c
Author: Marton Bod <[email protected]>
AuthorDate: Wed Apr 28 09:45:09 2021 +0200
Hive: fix issue of inserting empty data on Tez (#2516)
---
.../mr/hive/HiveIcebergOutputCommitter.java | 29 ++++++++++++++++------
.../TestHiveIcebergStorageHandlerWithEngine.java | 16 ++++++++++++
2 files changed, 38 insertions(+), 7 deletions(-)
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index a7d7891..a6ea8e2 100644
---
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -25,6 +25,7 @@ import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
@@ -92,8 +94,12 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
- Map<String, HiveIcebergRecordWriter> writers =
HiveIcebergRecordWriter.getWriters(attemptID);
Collection<String> outputs =
HiveIcebergStorageHandler.outputTables(context.getJobConf());
+ Map<String, HiveIcebergRecordWriter> writers =
Optional.ofNullable(HiveIcebergRecordWriter.getWriters(attemptID))
+ .orElseGet(() -> {
+ LOG.info("CommitTask found no writers for output tables: {},
attemptID: {}", outputs, attemptID);
+ return ImmutableMap.of();
+ });
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
try {
@@ -107,11 +113,16 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
Table table =
HiveIcebergStorageHandler.table(context.getJobConf(), output);
if (table != null) {
HiveIcebergRecordWriter writer = writers.get(output);
- DataFile[] closedFiles = writer != null ? writer.dataFiles() :
new DataFile[0];
+ DataFile[] closedFiles;
+ if (writer != null) {
+ closedFiles = writer.dataFiles();
+ } else {
+ LOG.info("CommitTask found no writer for specific table: {},
attemptID: {}", output, attemptID);
+ closedFiles = new DataFile[0];
+ }
+ // Creating the file containing the data files generated by
this task for this table
String fileForCommitLocation =
generateFileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
-
- // Creating the file containing the data files generated by this
task for this table
createFileForCommit(closedFiles, fileForCommitLocation,
table.io());
} else {
// When using Tez multi-table inserts, we could have more output
tables in config than
@@ -176,9 +187,13 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
.executeWith(tableExecutor)
.run(output -> {
Table table = HiveIcebergStorageHandler.table(jobConf, output);
- String catalogName =
HiveIcebergStorageHandler.catalogName(jobConf, output);
- jobLocations.add(generateJobLocation(table.location(), jobConf,
jobContext.getJobID()));
- commitTable(table.io(), fileExecutor, jobContext, output,
table.location(), catalogName);
+ if (table != null) {
+ String catalogName =
HiveIcebergStorageHandler.catalogName(jobConf, output);
+ jobLocations.add(generateJobLocation(table.location(), jobConf,
jobContext.getJobID()));
+ commitTable(table.io(), fileExecutor, jobContext, output,
table.location(), catalogName);
+ } else {
+ LOG.info("CommitJob found no serialized table in config for
table: {}. Skipping job commit.", output);
+ }
});
} finally {
fileExecutor.shutdown();
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 972fcf5..48bd181 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -705,6 +705,22 @@ public class TestHiveIcebergStorageHandlerWithEngine {
Assert.assertEquals("Linda", results.get(0)[1]);
}
+ @Test
+ public void testInsertEmptyResultSet() throws IOException {
+ Table source = testTables.createTable(shell, "source",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, ImmutableList.of());
+ Table target = testTables.createTable(shell, "target",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, ImmutableList.of());
+
+ shell.executeStatement("INSERT INTO target SELECT * FROM source");
+ HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+
+ testTables.appendIcebergTable(shell.getHiveConf(), source, fileFormat,
null,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ shell.executeStatement("INSERT INTO target SELECT * FROM source WHERE
first_name = 'Nobody'");
+ HiveIcebergTestUtils.validateData(target, ImmutableList.of(), 0);
+ }
+
private void testComplexTypeWrite(Schema schema, List<Record> records)
throws IOException {
String tableName = "complex_table";
Table table = testTables.createTable(shell, "complex_table", schema,
fileFormat, ImmutableList.of());