pvary commented on a change in pull request #2052:
URL: https://github.com/apache/iceberg/pull/2052#discussion_r553873631



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -66,9 +67,7 @@ public void initialize(@Nullable Configuration configuration, 
Properties serDePr
     assertNotVectorizedTez(configuration);
 
     Schema tableSchema;
-    if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) {
-      tableSchema = 
SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA));
-    } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
+    if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {

Review comment:
       Is this needed?
   The original intent of the change was that we have the table schema at hand 
on the mappers/reducers. If we remove this then every mapper/reducer has to 
read the table once to get the schema.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -82,7 +81,17 @@ public void initialize(@Nullable Configuration 
configuration, Properties serDePr
     }
 
     String[] selectedColumns = 
ColumnProjectionUtils.getReadColumnNames(configuration);
-    Schema projectedSchema = selectedColumns.length > 0 ? 
tableSchema.select(selectedColumns) : tableSchema;
+    // When same table is joined multiple times, it is possible some selected 
columns are duplicated,
+    // in this case wrong recordStructField position leads wrong value or 
ArrayIndexOutOfBoundException
+    String[] distinctSelectedColumns = 
Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
+    Schema projectedSchema = distinctSelectedColumns.length > 0 ?
+            tableSchema.select(distinctSelectedColumns) : tableSchema;
+    // the input split mapper handles does not belong to this table
+    // it is necessary to ensure projectedSchema equals to tableSchema,
+    // or we cannot find selectOperator's column from inspector
+    if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+      projectedSchema = tableSchema;
+    }

Review comment:
       @marton-bod: Could you please take a look? You know more about the 
schema projection.
   Thanks,
   Peter

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
##########
@@ -60,18 +60,23 @@ static TestHiveShell shell() {
   }
 
   static TestTables testTables(TestHiveShell shell, TestTables.TestTableType 
testTableType, TemporaryFolder temp)
-      throws IOException {
+          throws IOException {
 
     return testTableType.instance(shell.metastore().hiveConf(), temp);
   }
 
   static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder 
temp, String engine) {
+    init(shell, testTables, temp, engine, "false");

Review comment:
       Could we use a boolean instead of the string?

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -42,24 +42,41 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 public class TestHiveIcebergStorageHandlerWithEngine {
 
-  private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"};
+  private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"};

Review comment:
       nit: I usually try to avoid formatting only changes

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -42,24 +42,41 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 public class TestHiveIcebergStorageHandlerWithEngine {
 
-  private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"};
+  private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"};
+
+  private static final String[] CBO_ENABLES = new String[]{"true", "false"};

Review comment:
       We might not need this list as the values can not be changes. Could we 
just add this by hand at the `parameters()` method?

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -42,24 +42,41 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 public class TestHiveIcebergStorageHandlerWithEngine {
 
-  private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"};
+  private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"};
+
+  private static final String[] CBO_ENABLES = new String[]{"true", "false"};

Review comment:
       We might not need this list as the values can not be extended even on 
the long run. Could we just add this by hand at the `parameters()` method?

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -107,6 +126,9 @@
   @Parameter(2)
   public TestTables.TestTableType testTableType;
 
+  @Parameter(3)
+  public String cboEnable;

Review comment:
       Boolean please

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -142,29 +164,61 @@ public void testScanTable() throws IOException {
 
     // Adding the ORDER BY clause will cause Hive to spawn a local MR job this 
time.
     List<Object[]> descRows =
-        shell.executeStatement("SELECT first_name, customer_id FROM 
default.customers ORDER BY customer_id DESC");
+            shell.executeStatement("SELECT first_name, customer_id FROM 
default.customers ORDER BY customer_id DESC");
 
     Assert.assertEquals(3, descRows.size());
-    Assert.assertArrayEquals(new Object[] {"Trudy", 2L}, descRows.get(0));
-    Assert.assertArrayEquals(new Object[] {"Bob", 1L}, descRows.get(1));
-    Assert.assertArrayEquals(new Object[] {"Alice", 0L}, descRows.get(2));
+    Assert.assertArrayEquals(new Object[]{"Trudy", 2L}, descRows.get(0));
+    Assert.assertArrayEquals(new Object[]{"Bob", 1L}, descRows.get(1));
+    Assert.assertArrayEquals(new Object[]{"Alice", 0L}, descRows.get(2));
+  }

Review comment:
       Am I right that these are formatting only changes?
   It is much easier if we do not have them, so I usually try to avoid them in 
my PRs

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -142,29 +164,61 @@ public void testScanTable() throws IOException {
 
     // Adding the ORDER BY clause will cause Hive to spawn a local MR job this 
time.
     List<Object[]> descRows =
-        shell.executeStatement("SELECT first_name, customer_id FROM 
default.customers ORDER BY customer_id DESC");
+            shell.executeStatement("SELECT first_name, customer_id FROM 
default.customers ORDER BY customer_id DESC");
 
     Assert.assertEquals(3, descRows.size());
-    Assert.assertArrayEquals(new Object[] {"Trudy", 2L}, descRows.get(0));
-    Assert.assertArrayEquals(new Object[] {"Bob", 1L}, descRows.get(1));
-    Assert.assertArrayEquals(new Object[] {"Alice", 0L}, descRows.get(2));
+    Assert.assertArrayEquals(new Object[]{"Trudy", 2L}, descRows.get(0));
+    Assert.assertArrayEquals(new Object[]{"Bob", 1L}, descRows.get(1));
+    Assert.assertArrayEquals(new Object[]{"Alice", 0L}, descRows.get(2));
+  }
+
+  @Test
+  public void testSelectedColumnsNoOverlapJoin() throws IOException {
+    testTables.createTable(shell, "products", PRODUCT_SCHEMA, fileFormat, 
PRODUCT_RECORDS);
+    testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, 
ORDER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT o.order_id, o.customer_id, o.total, p.name " +
+                    "FROM default.orders o JOIN default.products p ON 
o.product_id = p.id ORDER BY o.order_id"
+    );
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[]{100L, 0L, 11.11d, "skirt"}, 
rows.get(0));
+    Assert.assertArrayEquals(new Object[]{101L, 0L, 22.22d, "tee"}, 
rows.get(1));
+    Assert.assertArrayEquals(new Object[]{102L, 1L, 33.33d, "watch"}, 
rows.get(2));
   }
 
   @Test
-  public void testJoinTables() throws IOException {
+  public void testSelectedColumnsOverlapJoin() throws IOException {
     testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
-        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, 
ORDER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT c.first_name, o.order_id " +
+                    "FROM default.orders o JOIN default.customers c ON 
o.customer_id = c.customer_id " +
+                    "ORDER BY o.order_id DESC"
+    );
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[]{"Bob", 102L}, rows.get(0));
+    Assert.assertArrayEquals(new Object[]{"Alice", 101L}, rows.get(1));
+    Assert.assertArrayEquals(new Object[]{"Alice", 100L}, rows.get(2));
+  }
+
+  @Test
+  public void testSelfJoin() throws IOException {
     testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, 
ORDER_RECORDS);
 
     List<Object[]> rows = shell.executeStatement(
-            "SELECT c.customer_id, c.first_name, o.order_id, o.total " +
-                    "FROM default.customers c JOIN default.orders o ON 
c.customer_id = o.customer_id " +
-                    "ORDER BY c.customer_id, o.order_id"
+            "SELECT o1.order_id, o1.customer_id, o1.total " +
+                    "FROM default.orders o1 JOIN default.orders o2 ON 
o1.order_id = o2.order_id ORDER BY o1.order_id"

Review comment:
       Why was this change needed?
   Could you please help?
   Thanks,
   Peter

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -66,9 +67,7 @@ public void initialize(@Nullable Configuration configuration, 
Properties serDePr
     assertNotVectorizedTez(configuration);
 
     Schema tableSchema;
-    if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) {
-      tableSchema = 
SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA));
-    } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
+    if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {

Review comment:
       Thanks for the explanation! This is something @marton-bod already tried 
his hands on. We might have to use the table identifier to prefix/postfix the 
schema....

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -142,29 +164,61 @@ public void testScanTable() throws IOException {
 
     // Adding the ORDER BY clause will cause Hive to spawn a local MR job this 
time.
     List<Object[]> descRows =
-        shell.executeStatement("SELECT first_name, customer_id FROM 
default.customers ORDER BY customer_id DESC");
+            shell.executeStatement("SELECT first_name, customer_id FROM 
default.customers ORDER BY customer_id DESC");
 
     Assert.assertEquals(3, descRows.size());
-    Assert.assertArrayEquals(new Object[] {"Trudy", 2L}, descRows.get(0));
-    Assert.assertArrayEquals(new Object[] {"Bob", 1L}, descRows.get(1));
-    Assert.assertArrayEquals(new Object[] {"Alice", 0L}, descRows.get(2));
+    Assert.assertArrayEquals(new Object[]{"Trudy", 2L}, descRows.get(0));
+    Assert.assertArrayEquals(new Object[]{"Bob", 1L}, descRows.get(1));
+    Assert.assertArrayEquals(new Object[]{"Alice", 0L}, descRows.get(2));
+  }
+
+  @Test
+  public void testSelectedColumnsNoOverlapJoin() throws IOException {
+    testTables.createTable(shell, "products", PRODUCT_SCHEMA, fileFormat, 
PRODUCT_RECORDS);
+    testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, 
ORDER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT o.order_id, o.customer_id, o.total, p.name " +
+                    "FROM default.orders o JOIN default.products p ON 
o.product_id = p.id ORDER BY o.order_id"
+    );
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[]{100L, 0L, 11.11d, "skirt"}, 
rows.get(0));
+    Assert.assertArrayEquals(new Object[]{101L, 0L, 22.22d, "tee"}, 
rows.get(1));
+    Assert.assertArrayEquals(new Object[]{102L, 1L, 33.33d, "watch"}, 
rows.get(2));
   }
 
   @Test
-  public void testJoinTables() throws IOException {
+  public void testSelectedColumnsOverlapJoin() throws IOException {
     testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
-        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, 
ORDER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT c.first_name, o.order_id " +
+                    "FROM default.orders o JOIN default.customers c ON 
o.customer_id = c.customer_id " +
+                    "ORDER BY o.order_id DESC"
+    );
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[]{"Bob", 102L}, rows.get(0));
+    Assert.assertArrayEquals(new Object[]{"Alice", 101L}, rows.get(1));
+    Assert.assertArrayEquals(new Object[]{"Alice", 100L}, rows.get(2));
+  }
+
+  @Test
+  public void testSelfJoin() throws IOException {
     testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, 
ORDER_RECORDS);
 
     List<Object[]> rows = shell.executeStatement(
-            "SELECT c.customer_id, c.first_name, o.order_id, o.total " +
-                    "FROM default.customers c JOIN default.orders o ON 
c.customer_id = o.customer_id " +
-                    "ORDER BY c.customer_id, o.order_id"
+            "SELECT o1.order_id, o1.customer_id, o1.total " +
+                    "FROM default.orders o1 JOIN default.orders o2 ON 
o1.order_id = o2.order_id ORDER BY o1.order_id"

Review comment:
       Oh.. I have missed that the FROM part is from the orders table.
   Sorry




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to