jerry-024 commented on code in PR #7972:
URL: https://github.com/apache/paimon/pull/7972#discussion_r3302591102
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java:
##########
@@ -179,6 +179,10 @@ public static boolean buildIndex(
List<ManifestEntry> entries = indexBuilder.scan();
List<IndexManifestEntry> deletedIndexEntries =
indexBuilder.deletedIndexEntries();
+ Long rowIdReassignCheckFromSnapshot =
+ indexBuilder.scanSnapshotId().isPresent()
+ ? indexBuilder.scanSnapshotId().get()
+ : null;
Review Comment:
nit: Can be simplified to:
```java
Long rowIdReassignCheckFromSnapshot =
indexBuilder.scanSnapshotId().orElse(null);
```
`Optional.orElse(null)` is idiomatic and avoids the double method call.
##########
paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java:
##########
@@ -1008,6 +1009,100 @@ public void testCommitManifestWithProperties() throws
Exception {
}
}
+ @Test
+ public void testReplaceManifestListWithRowIdReassignProperty() throws
Exception {
+ TestFileStore store = createStore(false);
+
+ List<KeyValue> keyValues = generateDataList(1);
+ BinaryRow partition = gen.getPartition(keyValues.get(0));
+ Snapshot latest = store.commitData(keyValues, s -> partition, kv ->
0).get(0);
+
+ Map<String, String> reassignProperties = new HashMap<>();
+ reassignProperties.put("keep", "v1");
+ reassignProperties.put(Snapshot.ROW_ID_REASSIGN_PROPERTY, "true");
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ assertThat(
+ commit.replaceManifestList(
+ latest,
+ latest.totalRecordCount(),
+ baseManifestList(latest),
+ deltaManifestList(latest),
+ latest.indexManifest(),
+ latest.nextRowId(),
+ reassignProperties))
+ .isTrue();
+ }
+
+ Snapshot reassignSnapshot =
checkNotNull(store.snapshotManager().latestSnapshot());
+
assertThat(reassignSnapshot.properties()).isEqualTo(reassignProperties);
+
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ assertThat(
+ commit.replaceManifestList(
+ reassignSnapshot,
+ reassignSnapshot.totalRecordCount(),
+ baseManifestList(reassignSnapshot),
+ deltaManifestList(reassignSnapshot),
+ reassignSnapshot.indexManifest(),
+ reassignSnapshot.nextRowId()))
+ .isTrue();
+ }
+
+ Snapshot normalSnapshot =
checkNotNull(store.snapshotManager().latestSnapshot());
+ assertThat(normalSnapshot.properties())
+ .containsEntry("keep", "v1")
+ .doesNotContainKey(Snapshot.ROW_ID_REASSIGN_PROPERTY);
+ }
+
+ @Test
+ public void testRowIdReassignConflictFromOptions() throws Exception {
Review Comment:
suggestion: Consider adding a test that verifies `compactManifestOnce` also
strips `ROW_ID_REASSIGN_PROPERTY`. Current tests cover `replaceManifestList`
but not the compact path.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java:
##########
@@ -506,22 +512,39 @@ private static List<Committable> createDeleteCommittables(
}
private static void commit(
- FileStoreTable table, String indexType, DataStream<Committable>
written) {
+ FileStoreTable table,
+ String indexType,
+ DataStream<Committable> written,
+ Long rowIdReassignCheckFromSnapshot) {
+ FileStoreTable commitTable = withRowIdReassignCheck(table,
rowIdReassignCheckFromSnapshot);
OneInputStreamOperatorFactory<Committable, Committable>
committerOperator =
new CommitterOperatorFactory<>(
false,
true,
"GenericIndexCommitter-" + indexType + "-" +
UUID.randomUUID(),
context ->
new StoreCommitter(
- table,
table.newCommit(context.commitUser()), context),
+ commitTable,
+
commitTable.newCommit(context.commitUser()),
+ context),
new NoopCommittableStateManager());
written.transform("COMMIT OPERATOR", new CommittableTypeInfo(),
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
}
+ private static FileStoreTable withRowIdReassignCheck(
+ FileStoreTable table, Long rowIdReassignCheckFromSnapshot) {
+ if (rowIdReassignCheckFromSnapshot == null) {
+ return table;
+ }
+ return table.copy(
+ Collections.singletonMap(
+
CoreOptions.COMMIT_ROW_ID_REASSIGN_LAST_SAFE_SNAPSHOT.key(),
+ String.valueOf(rowIdReassignCheckFromSnapshot)));
+ }
+
Review Comment:
nit: This method is identical to
`BTreeIndexTopoBuilder.withRowIdReassignCheck`. Consider extracting to a shared
utility to avoid drift if the option key or logic changes later. Not blocking —
the method is small.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]