This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch fallbackForLogTruncation
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit bbfa13eb61accf9ad6b3aebd5d8b89a0a5053f7f
Author: Vinish Reddy <[email protected]>
AuthorDate: Mon Jan 12 08:09:19 2026 -0800

    Add fallback for log truncation issue in Delta source
---
 .../apache/xtable/delta/DeltaConversionSource.java | 60 ++++++++++++--
 .../xtable/kernel/DeltaKernelConversionSource.java | 43 +++++++++-
 .../xtable/delta/ITDeltaConversionSource.java      | 95 +++++++++++++++++++++-
 3 files changed, 186 insertions(+), 12 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index bb40a315..3cc7f091 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -41,6 +41,8 @@ import org.apache.spark.sql.delta.actions.AddFile;
 import org.apache.spark.sql.delta.actions.RemoveFile;
 
 import scala.Option;
+import scala.Tuple2;
+import scala.collection.Seq;
 
 import io.delta.tables.DeltaTable;
 
@@ -191,18 +193,58 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
   }
 
   /*
-   * In Delta Lake, each commit is a self-describing one i.e. it contains list 
of new files while
-   * also containing list of files that were deleted. So, vacuum has no 
special effect on the
-   * incremental sync. Hence, existence of commit is the only check required.
+   * Following checks are performed:
+   * 1. Check if a commit exists at or before the provided instant.
+   * 2. Verify that commit files needed for incremental sync are still 
accessible.
+   *
+   * Delta Lake's VACUUM operation removes old JSON commit files from 
_delta_log/, which can
+   * break incremental sync even though commits are self-describing. This 
method attempts to
+   * access the commit chain to ensure files haven't been vacuumed.
    */
   @Override
   public boolean isIncrementalSyncSafeFrom(Instant instant) {
-    DeltaHistoryManager.Commit deltaCommitAtOrBeforeInstant =
-        deltaLog.history().getActiveCommitAtTime(Timestamp.from(instant), 
true, false, true);
-    // There is a chance earliest commit of the table is returned if the 
instant is before the
-    // earliest commit of the table, hence the additional check.
-    Instant deltaCommitInstant = 
Instant.ofEpochMilli(deltaCommitAtOrBeforeInstant.getTimestamp());
-    return deltaCommitInstant.equals(instant) || 
deltaCommitInstant.isBefore(instant);
+    try {
+      DeltaHistoryManager.Commit deltaCommitAtOrBeforeInstant =
+          deltaLog.history().getActiveCommitAtTime(Timestamp.from(instant), 
true, false, true);
+
+      // There is a chance earliest commit of the table is returned if the 
instant is before the
+      // earliest commit of the table, hence the additional check.
+      Instant deltaCommitInstant = 
Instant.ofEpochMilli(deltaCommitAtOrBeforeInstant.getTimestamp());
+      if (deltaCommitInstant.isAfter(instant)) {
+        log.info(
+            "No commit found at or before instant {}. Earliest available 
commit is at {}",
+            instant,
+            deltaCommitInstant);
+        return false;
+      }
+
+      long versionAtInstant = deltaCommitAtOrBeforeInstant.version();
+
+      // Verify that we can actually access commit files from this version 
onward by attempting
+      // to read the changes. This will fail if VACUUM has removed the 
necessary commit files.
+      // We only need to verify we can start iterating - we don't need to 
consume all changes.
+      scala.collection.Iterator<Tuple2<Object, Seq<Action>>> changesIterator =
+          deltaLog.getChanges(versionAtInstant, true);
+
+      // Test if we can access at least the first commit. If commit files are 
missing due to
+      // VACUUM, this will throw an exception (typically FileNotFoundException 
or similar).
+      if (changesIterator.hasNext()) {
+        // Successfully verified we can access commit files
+        return true;
+      } else {
+        // No changes available from this version (shouldn't happen for valid 
commits)
+        log.warn(
+            "No changes available starting from version {} (instant: {})", 
versionAtInstant, instant);
+        return false;
+      }
+    } catch (Exception e) {
+      // Commit files have been vacuumed or are otherwise inaccessible
+      log.info(
+          "Cannot perform incremental sync from instant {} due to missing or 
inaccessible commit files: {}",
+          instant,
+          e.getMessage());
+      return false;
+    }
   }
 
   @Override
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java
index fa088f08..631c69e5 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java
@@ -24,21 +24,27 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
 
 import io.delta.kernel.Snapshot;
 import io.delta.kernel.Table;
+import io.delta.kernel.data.ColumnarBatch;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.DeltaLogActionUtils;
 import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.TableImpl;
 import io.delta.kernel.internal.actions.AddFile;
 import io.delta.kernel.internal.actions.RemoveFile;
 import io.delta.kernel.internal.actions.RowBackedAction;
 import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.utils.CloseableIterator;
 
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.model.CommitsBacklog;
@@ -192,9 +198,42 @@ public class DeltaKernelConversionSource implements 
ConversionSource<Long> {
       // There is a chance earliest commit of the table is returned if the 
instant is before the
       // earliest commit of the table, hence the additional check.
       Instant deltaCommitInstant = 
Instant.ofEpochMilli(snapshot.getTimestamp(engine));
-      return deltaCommitInstant.equals(instant) || 
deltaCommitInstant.isBefore(instant);
+      if (deltaCommitInstant.isAfter(instant)) {
+        log.info(
+            "No commit found at or before instant {}. Earliest available 
commit is at {}",
+            instant,
+            deltaCommitInstant);
+        return false;
+      }
+
+      long versionAtInstant = snapshot.getVersion();
+      long currentVersion = table.getLatestSnapshot(engine).getVersion();
+
+      // Verify that we can actually access commit files from this version to 
current version.
+      // This will fail if VACUUM has removed the necessary commit files.
+      Set<DeltaLogActionUtils.DeltaAction> actionSet = new HashSet<>();
+      actionSet.add(DeltaLogActionUtils.DeltaAction.ADD);
+      actionSet.add(DeltaLogActionUtils.DeltaAction.REMOVE);
+
+      TableImpl tableImpl = (TableImpl) table;
+      // Attempt to get changes - this will throw if commit files are missing
+      try (CloseableIterator<ColumnarBatch> changesIterator =
+          tableImpl.getChanges(engine, versionAtInstant, currentVersion, 
actionSet)) {
+        // Test if we can access at least the first batch
+        if (changesIterator.hasNext()) {
+          // Successfully verified we can access commit files
+          return true;
+        } else {
+          // No changes available (edge case: versionAtInstant == 
currentVersion)
+          return true;
+        }
+      }
     } catch (Exception e) {
-      log.error("Error checking if incremental sync is safe from " + instant, 
e);
+      // Commit files have been vacuumed or are otherwise inaccessible
+      log.info(
+          "Cannot perform incremental sync from instant {} due to missing or 
inaccessible commit files: {}",
+          instant,
+          e.getMessage());
       return false;
     }
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
index 3a754e27..fc855a59 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
@@ -21,8 +21,10 @@ package org.apache.xtable.delta;
 import static org.apache.xtable.testutil.ITTestUtils.validateTable;
 import static org.junit.jupiter.api.Assertions.*;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Instant;
@@ -422,7 +424,7 @@ public class ITDeltaConversionSource {
   }
 
   @Test
-  public void testsShowingVacuumHasNoEffectOnIncrementalSync() {
+  public void testVacuumAffectsIncrementalSyncSafety() {
     boolean isPartitioned = true;
     String tableName = GenericTable.getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
@@ -472,6 +474,97 @@ public class ITDeltaConversionSource {
     
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));
   }
 
+  @Test
+  public void testIncrementalSyncSafeWhenCommitFilesExist() {
+    String tableName = GenericTable.getTableName();
+    TestSparkDeltaTable testSparkDeltaTable =
+        new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
+
+    // Insert initial data
+    testSparkDeltaTable.insertRows(50);
+    Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
+
+    // Add more commits
+    testSparkDeltaTable.insertRows(50);
+
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+
+    // Should be safe - commit files still exist
+    assertTrue(
+        
conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)),
+        "Incremental sync should be safe when commit files exist");
+  }
+
+  @Test
+  public void testIncrementalSyncUnsafeForInstantBeforeEarliestCommit() {
+    String tableName = GenericTable.getTableName();
+    TestSparkDeltaTable testSparkDeltaTable =
+        new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
+
+    // Insert data
+    testSparkDeltaTable.insertRows(50);
+
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+
+    // Try instant from 1 hour ago (before table existed)
+    Instant instantHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
+    assertFalse(
+        conversionSource.isIncrementalSyncSafeFrom(instantHourAgo),
+        "Incremental sync should be unsafe for instant before earliest 
commit");
+  }
+
+  @Test
+  public void testIncrementalSyncUnsafeAfterManualCommitFileDeletion() throws 
IOException {
+    String tableName = GenericTable.getTableName();
+    TestSparkDeltaTable testSparkDeltaTable =
+        new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
+
+    testSparkDeltaTable.insertRows(50);
+    Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
+    testSparkDeltaTable.insertRows(50);
+    testSparkDeltaTable.insertRows(50);
+
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
+            .build();
+
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+
+    assertTrue(
+        
conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)),
+        "Should be safe before deleting commit files");
+
+    // Delete commit file to simulate VACUUM or filesystem corruption
+    Path commitFile = Paths.get(testSparkDeltaTable.getBasePath(), 
"_delta_log", "00000000000000000000.json");
+    if (Files.exists(commitFile)) {
+      Files.delete(commitFile);
+    }
+
+    conversionSource = 
conversionSourceProvider.getConversionSourceInstance(tableConfig);
+
+    assertFalse(
+        
conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)),
+        "Should be unsafe after manually deleting commit files");
+  }
+
   @ParameterizedTest
   @MethodSource("testWithPartitionToggle")
   public void testVacuum(boolean isPartitioned) {

Reply via email to