szehon-ho commented on code in PR #7120:
URL: https://github.com/apache/iceberg/pull/7120#discussion_r1160352335


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -118,6 +119,10 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     this.applicationId = applicationId;
     this.wapEnabled = writeConf.wapEnabled();
     this.wapId = writeConf.wapId();
+    this.outputSpecId =

Review Comment:
   Should we add some validation, that given specid is contained inside 
table.specs?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java:
##########
@@ -183,4 +184,91 @@ public void testViewsReturnRecentResults() {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM tmp"));
   }
+
+  @Test
+  public void testWriteWithOutputSpec() throws NoSuchTableException {
+    Table table = validationCatalog.loadTable(tableIdent);
+    // Drop all records in table to have a fresh start.
+    sql("DELETE FROM %s", tableName);
+    table.refresh();
+
+    final int originalSpecId = table.spec().specId();
+    table.updateSpec().addField("data").commit();
+
+    table.refresh();
+    sql("REFRESH TABLE %s", tableName);
+
+    // By default, we write to the current spec.
+    sql("INSERT INTO TABLE %s VALUES (10, 'a')", tableName);
+
+    List<Object[]> expected = ImmutableList.of(row(10L, "a", 
table.spec().specId()));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", 
tableName));
+
+    // Output spec ID should be respected when present.
+    List<SimpleRecord> data =
+        ImmutableList.of(new SimpleRecord(11, "b"), new SimpleRecord(12, "c"));
+    spark
+        .createDataFrame(data, SimpleRecord.class)
+        .toDF()
+        .writeTo(tableName)
+        .option("output-spec-id", Integer.toString(originalSpecId))
+        .append();
+
+    expected =
+        ImmutableList.of(
+            row(10L, "a", table.spec().specId()),
+            row(11L, "b", originalSpecId),
+            row(12L, "c", originalSpecId));
+    assertEquals(
+        "Rows must match",
+        expected,
+        sql("SELECT id, data, _spec_id FROM %s WHERE id >= 10 ORDER BY id", 
tableName));
+
+    // Verify that the actual partitions are written with the correct spec ID.
+    // Two of the partitions should have the original spec ID and one should 
have the new one.
+

Review Comment:
   Nit: can remove extra newline here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to