the-other-tim-brown commented on code in PR #780:
URL: https://github.com/apache/incubator-xtable/pull/780#discussion_r2656712406
##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java:
##########
@@ -114,18 +116,125 @@ private Snapshot getLastSnapshot() {
@Override
public TableChange getTableChangeForCommit(Snapshot snapshot) {
- throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ InternalTable tableAtSnapshot = getTable(snapshot);
+ InternalSchema internalSchema = tableAtSnapshot.getReadSchema();
+
+ InternalFilesDiff filesDiff =
+ dataFileExtractor.extractFilesDiff(paimonTable, snapshot,
internalSchema);
+
+ return TableChange.builder()
+ .tableAsOfChange(tableAtSnapshot)
+ .filesDiff(filesDiff)
+ .sourceIdentifier(getCommitIdentifier(snapshot))
+ .build();
}
@Override
public CommitsBacklog<Snapshot> getCommitsBacklog(
InstantsForIncrementalSync instantsForIncrementalSync) {
- throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant();
+ long lastSyncTimeMillis = lastSyncInstant.toEpochMilli();
+
+ log.info(
+ "Getting commits backlog for Paimon table {} from instant {}",
+ paimonTable.name(),
+ lastSyncInstant);
+
+ Iterator<Snapshot> snapshotIterator;
+ try {
+ snapshotIterator = snapshotManager.snapshots();
+ } catch (IOException e) {
+ throw new ReadException("Could not iterate over the Paimon snapshot
list", e);
+ }
+
+ List<Snapshot> snapshotsToProcess = new ArrayList<>();
+ while (snapshotIterator.hasNext()) {
+ Snapshot snapshot = snapshotIterator.next();
+ // Only include snapshots committed after the last sync
+ if (snapshot.timeMillis() > lastSyncTimeMillis) {
+ snapshotsToProcess.add(snapshot);
+ log.debug(
+ "Including snapshot {} (time={}, commitId={}) in backlog",
+ snapshot.id(),
+ snapshot.timeMillis(),
+ snapshot.commitIdentifier());
+ }
+ }
+
+ log.info("Found {} snapshots to process for incremental sync",
snapshotsToProcess.size());
+
+ return CommitsBacklog.<Snapshot>builder()
+ .commitsToProcess(snapshotsToProcess)
+ .inFlightInstants(Collections.emptyList())
+ .build();
}
@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
- return false; // Incremental sync is not supported yet
+ long timeInMillis = instant.toEpochMilli();
+
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ if (earliestSnapshotId == null || latestSnapshotId == null) {
+ log.warn("No snapshots found in table {}", paimonTable.name());
+ return false;
+ }
+
+ Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId);
+ Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+
+ // Check 1: If instant is in the future (after latest snapshot), return
false
+ if (timeInMillis > latestSnapshot.timeMillis()) {
+ log.warn(
+ "Instant {} is in the future. Latest snapshot {} has time {}",
+ instant,
+ latestSnapshot.id(),
+ latestSnapshot.timeMillis());
+ return false;
+ }
+
+ // Check 2: Has snapshot expiration affected this instant?
+ // If the earliest snapshot is newer than the requested instant,
+ // then snapshots have been expired and we can't do incremental sync
+ if (earliestSnapshot.timeMillis() > timeInMillis) {
+ log.warn(
+ "Incremental sync is not safe from instant {}. "
+ + "Earliest available snapshot {} (time={}) is newer than the
requested instant. "
+ + "Snapshots may have been expired.",
+ instant,
+ earliestSnapshot.id(),
+ earliestSnapshot.timeMillis());
+ return false;
+ }
+
+ // Check 3: Verify a snapshot exists at or before the instant
Review Comment:
Can this check simply check that the `earliestSnapshot.timeMillis <=
timeInMillis`?
##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java:
##########
@@ -147,6 +147,99 @@ void testColumnStatsAreEmpty() {
}
}
+ @Test
+ void testExtractFilesDiffWithNewFiles() {
+ createUnpartitionedTable();
+
+ // Insert initial data
+ testTable.insertRows(5);
+ org.apache.paimon.Snapshot firstSnapshot =
paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create a second snapshot
+ testTable.insertRows(3);
+ org.apache.paimon.Snapshot secondSnapshot =
paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+
+ org.apache.xtable.model.storage.InternalFilesDiff filesDiff =
+ extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+ // Verify we have added files
+ assertNotNull(filesDiff);
+ assertNotNull(filesDiff.getFilesAdded());
+ assertTrue(filesDiff.getFilesAdded().size() > 0);
+
+ // Verify removed files collection exists (size may vary based on
compaction behavior)
+ assertNotNull(filesDiff.getFilesRemoved());
Review Comment:
Is there any way to configure the test so that the sizes are predictable and
we can assert on them?
Also should we assert the files removed is non-zero?
##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java:
##########
@@ -147,6 +147,99 @@ void testColumnStatsAreEmpty() {
}
}
+ @Test
+ void testExtractFilesDiffWithNewFiles() {
+ createUnpartitionedTable();
+
+ // Insert initial data
+ testTable.insertRows(5);
+ org.apache.paimon.Snapshot firstSnapshot =
paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create a second snapshot
+ testTable.insertRows(3);
+ org.apache.paimon.Snapshot secondSnapshot =
paimonTable.snapshotManager().latestSnapshot();
Review Comment:
Can we import `org.apache.paimon.Snapshot` here?
##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java:
##########
@@ -165,38 +167,105 @@ void
testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() {
}
@Test
- void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+ void testGetCommitsBacklogReturnsCommitsAfterLastSync() {
+ // Insert initial data to create first snapshot
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create second snapshot
testTable.insertRows(3);
- Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+ assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
- UnsupportedOperationException exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> conversionSource.getTableChangeForCommit(snapshot));
+ // Get commits backlog from first snapshot time
+ InstantsForIncrementalSync instantsForSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(firstSnapshot.timeMillis()))
+ .build();
+
+ CommitsBacklog<Snapshot> backlog =
conversionSource.getCommitsBacklog(instantsForSync);
+
+ // Verify we get at least the second snapshot (may get more if insertRows
creates multiple)
+ assertNotNull(backlog);
+ assertTrue(backlog.getCommitsToProcess().size() >= 1);
Review Comment:
If we cannot know the size for certain upfront, we may want to assert that
the first snapshot is not in the list of commits to process
##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java:
##########
@@ -165,38 +167,105 @@ void
testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() {
}
@Test
- void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+ void testGetCommitsBacklogReturnsCommitsAfterLastSync() {
+ // Insert initial data to create first snapshot
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create second snapshot
testTable.insertRows(3);
- Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+ assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
- UnsupportedOperationException exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> conversionSource.getTableChangeForCommit(snapshot));
+ // Get commits backlog from first snapshot time
+ InstantsForIncrementalSync instantsForSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(firstSnapshot.timeMillis()))
+ .build();
+
+ CommitsBacklog<Snapshot> backlog =
conversionSource.getCommitsBacklog(instantsForSync);
+
+ // Verify we get at least the second snapshot (may get more if insertRows
creates multiple)
Review Comment:
I'm not familiar with Paimon, what would cause a single round of inserts to
create multiple snapshots?
##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java:
##########
@@ -165,38 +167,105 @@ void
testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() {
}
@Test
- void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+ void testGetCommitsBacklogReturnsCommitsAfterLastSync() {
+ // Insert initial data to create first snapshot
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create second snapshot
testTable.insertRows(3);
- Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+ assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
- UnsupportedOperationException exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> conversionSource.getTableChangeForCommit(snapshot));
+ // Get commits backlog from first snapshot time
+ InstantsForIncrementalSync instantsForSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(firstSnapshot.timeMillis()))
+ .build();
+
+ CommitsBacklog<Snapshot> backlog =
conversionSource.getCommitsBacklog(instantsForSync);
+
+ // Verify we get at least the second snapshot (may get more if insertRows
creates multiple)
+ assertNotNull(backlog);
+ assertTrue(backlog.getCommitsToProcess().size() >= 1);
+ // Verify the last snapshot in the backlog is the second snapshot
+ assertEquals(
+ secondSnapshot.id(),
+ backlog.getCommitsToProcess().get(backlog.getCommitsToProcess().size()
- 1).id());
+ assertTrue(backlog.getInFlightInstants().isEmpty());
+ }
- assertEquals("Incremental Sync is not supported yet.",
exception.getMessage());
+ @Test
+ void testGetCommitsBacklogReturnsEmptyForFutureInstant() {
+ testTable.insertRows(5);
+
+ // Use a future instant
+ InstantsForIncrementalSync instantsForSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.now().plusSeconds(3600))
+ .build();
+
+ CommitsBacklog<Snapshot> backlog =
conversionSource.getCommitsBacklog(instantsForSync);
+
+ // Verify no snapshots are returned
+ assertNotNull(backlog);
+ assertTrue(backlog.getCommitsToProcess().isEmpty());
}
@Test
- void testGetCommitsBacklogThrowsUnsupportedOperationException() {
- InstantsForIncrementalSync mockInstants =
-
InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build();
+ void testGetTableChangeForCommitReturnsCorrectFilesDiff() {
+ // Insert initial data
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
- UnsupportedOperationException exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> conversionSource.getCommitsBacklog(mockInstants));
+ // Insert more data to create second snapshot
+ testTable.insertRows(3);
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+
+ // Get table change for second snapshot
+ TableChange tableChange =
conversionSource.getTableChangeForCommit(secondSnapshot);
+
+ // Verify table change structure
+ assertNotNull(tableChange);
+ assertNotNull(tableChange.getFilesDiff());
+ assertNotNull(tableChange.getTableAsOfChange());
+ assertEquals(
+ Long.toString(secondSnapshot.commitIdentifier()),
tableChange.getSourceIdentifier());
- assertEquals("Incremental Sync is not supported yet.",
exception.getMessage());
+ // For append-only table, we should have added files and no removed files
+ assertTrue(tableChange.getFilesDiff().getFilesAdded().size() > 0);
}
@Test
- void testIsIncrementalSyncSafeFromReturnsFalse() {
- Instant testInstant = Instant.now();
+ void testIsIncrementalSyncSafeFromReturnsTrueForValidInstant() {
Review Comment:
Can we add a test where the `IsIncrementalSyncSafeFrom` returns false since
the instant is before the first snapshot?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]