stevenzwu commented on code in PR #13555:
URL: https://github.com/apache/iceberg/pull/13555#discussion_r2220354876
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -300,7 +303,8 @@ public void testBinPackAfterPartitionChange() {
Integer.toString(averageFileSize(table) + 1000))
.option(
RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
- Integer.toString(averageFileSize(table) + 1001))
+ // Increase max file size for V3 to account for additional row
lineage fields
+ Integer.toString(averageFileSize(table) + (formatVersion >= 3
? 11000 : 1001)))
Review Comment:
Curious if we come up with the `11000` number after a few tries?
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -976,10 +978,50 @@ public void testRewriteDataFilesSummary() {
EnvironmentContext.ENGINE_VERSION, v ->
assertThat(v).startsWith("4.0"));
}
+ @TestTemplate
+ public void testRewriteDataFilesPreservesLineage() throws
NoSuchTableException {
+ createTable(ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"));
+ List<ThreeColumnRecord> records = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ records.add(new ThreeColumnRecord(i, null, null));
+ }
+
+ spark
+ .createDataFrame(records, ThreeColumnRecord.class)
+ .repartition(10)
+ .writeTo(tableName)
+ .append();
+ List<Object[]> expectedRowsWithLineage =
+ sql("SELECT c1, _row_id, _last_updated_sequence_number FROM %s ORDER
BY c1", tableName);
+ List<Object[]> output =
+ sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data file",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ List<Object[]> rowsWithLineageAfterRewrite =
+ sql("SELECT c1, _row_id, _last_updated_sequence_number FROM %s ORDER
BY c1", tableName);
+ assertEquals(
+ "Rows with lineage before rewrite should equal rows with lineage after
rewrite",
+ expectedRowsWithLineage,
+ rowsWithLineageAfterRewrite);
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
+ private void createTable(Map<String, String> properties) {
Review Comment:
nit: This is only used in once place. what about just inline the sql, which
is used in many other places? It can reduce 2 sql executions to 1.
```
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg
TBLPROPERTIES('format-version' = '3')", tableName);
```
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java:
##########
@@ -163,6 +165,14 @@ private Spark3Util.CatalogAndIdentifier
catalogAndIdentifier(CaseInsensitiveStri
selector = TAG_PREFIX + tag;
}
+ String groupId =
+ options.getOrDefault(
+ SparkReadOptions.SCAN_TASK_SET_ID,
+ options.get(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID));
+ if (groupId != null) {
+ selector = REWRITE_PREFIX + groupId.replace("-", "");
Review Comment:
yeah. matcher only looks at the prefix. this is not a split on separator
char.
Please add a comment to explain the the purpose of `replace` to shorten the
name a bit. 4 chars more precisely for UUID string.
BTW, this will add 44 chars to the identifier (8 prefix chars, 32 hex chars,
4 hypthen chars). is there any identifier size limit that we should be
concerned about? I vaguely remember some scenario has 120 chars limit.
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -467,76 +471,111 @@ public void
testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception {
}
@TestTemplate
- public void testBinPackWithDeletes() throws IOException {
- assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+ public void testBinPackWithDVs() throws IOException {
Review Comment:
the diff seems messed up.
I assume you just renamed this test to `testBinPackWithV2PositionDeletes`
test below and added this new `testBinPackWithDVs`.
Ideally, this line should only diff on the renaming. then a new test method
`testBinPackWithDVs` was added after this method.
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -976,10 +978,50 @@ public void testRewriteDataFilesSummary() {
EnvironmentContext.ENGINE_VERSION, v ->
assertThat(v).startsWith("4.0"));
}
+ @TestTemplate
+ public void testRewriteDataFilesPreservesLineage() throws
NoSuchTableException {
+ createTable(ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"));
+ List<ThreeColumnRecord> records = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ records.add(new ThreeColumnRecord(i, null, null));
+ }
+
+ spark
+ .createDataFrame(records, ThreeColumnRecord.class)
+ .repartition(10)
+ .writeTo(tableName)
+ .append();
+ List<Object[]> expectedRowsWithLineage =
Review Comment:
we should assert on the expected values for row_id and seq_num columns here.
Otherwise, if both results have null row_id and seq_num, this test won't be
able to catch that problem in line 1006 assertion.
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java:
##########
@@ -128,74 +128,124 @@ public String name() {
return name;
}
- private Pair<Table, Long> load(Identifier ident) throws NoSuchTableException
{
+ private SparkTable load(Identifier ident) throws NoSuchTableException {
Preconditions.checkArgument(
ident.namespace().length == 0, CLASS_NAME + " does not support
namespaces");
Pair<String, List<String>> parsedIdent = parseIdent(ident);
String key = parsedIdent.first();
- List<String> metadata = parsedIdent.second();
+ TableLoadOptions options = parseLoadOptions(parsedIdent.second());
+ validateTableOptions(options);
Review Comment:
nit: validation probably can be called/done inside the `parseLoadOptions`,
which will only return a parsed POJO if validation passed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]