This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cea6498  Hive: enable inserting data from joins (#2251)
cea6498 is described below

commit cea6498352e4de3bbe62fc3218cb06abd01ce3de
Author: Marton Bod <[email protected]>
AuthorDate: Fri Feb 19 11:28:42 2021 +0100

    Hive: enable inserting data from joins (#2251)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java    |  6 +++++-
 .../hive/TestHiveIcebergStorageHandlerWithEngine.java | 19 +++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 955fa8c..10a1e27 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -95,7 +95,11 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   @Override
   public void configureOutputJobProperties(TableDesc tableDesc, Map<String, 
String> map) {
     overlayTableProperties(conf, tableDesc, map);
-    // Putting the key into the table props and not the map, so that 
projection pushdown can be determined on a
+    // For Tez, setting the committer here is enough to make sure it'll be 
part of the jobConf
+    map.put("mapred.output.committer.class", 
HiveIcebergOutputCommitter.class.getName());
+    // For MR, the jobConf is set only in configureJobConf, so we're setting 
the write key here to detect it over there
+    map.put(WRITE_KEY, "true");
+    // Putting the key into the table props as well, so that projection 
pushdown can be determined on a
     // table-level and skipped only for output tables in HiveIcebergSerde. 
Properties from the map will be present in
     // the serde config for all tables in the query, not just the output 
tables, so we can't rely on that in the serde.
     tableDesc.getProperties().put(WRITE_KEY, "true");
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c115fb3..ff120a3 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -422,6 +422,25 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   }
 
   @Test
+  public void testInsertFromJoiningTwoIcebergTables() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", 
executionEngine.equals("mr"));
+
+    PartitionSpec spec = 
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+            .identity("last_name").build();
+    testTables.createTable(shell, "source_customers_1", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            spec, fileFormat, 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    testTables.createTable(shell, "source_customers_2", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            spec, fileFormat, 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    Table table = testTables.createTable(shell, "target_customers",
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, 
fileFormat, ImmutableList.of());
+
+    shell.executeStatement("INSERT INTO target_customers SELECT a.customer_id, 
b.first_name, a.last_name FROM " +
+            "source_customers_1 a JOIN source_customers_2 b ON a.last_name = 
b.last_name");
+
+    HiveIcebergTestUtils.validateData(table, 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 0);
+  }
+
+  @Test
   public void testWriteArrayOfPrimitivesInTable() throws IOException {
     Assume.assumeTrue("Tez write is not implemented yet", 
executionEngine.equals("mr"));
     Schema schema = new Schema(required(1, "id", Types.LongType.get()),

Reply via email to