gustavoatt commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1160930338


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java:
##########
@@ -183,4 +184,91 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", 
table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", 
tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", 
tableName));
+
+    // Verify that the actual partitions are written with the correct spec ID.
+    // Two of the partitions should have the original spec ID and one should 
have the new one.
+
+    if (!this.getClass().equals(TestPartitionedWritesToWapBranch.class)) {

Review Comment:
   Just filed one at https://github.com/apache/iceberg/issues/7297. Basically 
I'm not able to read from the partitions table the newly written data into the 
WAP branch. It does work for non WAP branches though.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -118,6 +119,10 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     this.applicationId = applicationId;
     this.wapEnabled = writeConf.wapEnabled();
     this.wapId = writeConf.wapId();
+    this.outputSpecId =

Review Comment:
   Done, added a check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to