This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b72892272c [core] Add eager failure for fallback branch read failures 
(#7765)
b72892272c is described below

commit b72892272c32106e92cf2b02d5c5d354254e730c
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 7 15:35:09 2026 +0530

    [core] Add eager failure for fallback branch read failures (#7765)
    
    - When reading from a fallback branch fails, the exception is swallowed
    and the reader falls through to the current branch. This can mask data
    issues, and the LOG.error does not include the exception object so the
    stack trace was lost.
    - Added `scan.fallback-branch.read-fail-fast` (default false). When
    enabled the exception is propagated to the caller so the read fails
    immediately. When disabled the failure is logged with the full stack
    trace and the reader still falls through to the current branch.
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 +++
 .../paimon/table/FallbackReadFileStoreTable.java   | 22 +++++-
 .../table/FallbackReadFileStoreTableTest.java      | 90 ++++++++++++++++++++++
 4 files changed, 126 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index af7e996c8b..a1b016fecf 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1187,6 +1187,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>String</td>
             <td>When a batch job queries from a table, if a partition does not 
exist in the current branch, the reader will try to get this partition from 
this fallback branch.</td>
         </tr>
+        <tr>
+            <td><h5>scan.fallback-branch.read-fail-fast</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to fail the read immediately when reading from a 
fallback branch throws. By default the failure is logged with the full stack 
trace and the reader falls through to the current branch, which can mask data 
issues. Set this to true to surface fallback branch errors to the caller 
instead.</td>
+        </tr>
         <tr>
             <td><h5>scan.fallback-delta-branch</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 2f24db43f4..3364ca4d39 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1972,6 +1972,16 @@ public class CoreOptions implements Serializable {
                             "When a batch job queries from a table, if a 
partition does not exist in the current branch, "
                                     + "the reader will try to get this 
partition from this fallback branch.");
 
+    public static final ConfigOption<Boolean> 
SCAN_FALLBACK_BRANCH_READ_FAIL_FAST =
+            key("scan.fallback-branch.read-fail-fast")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to fail the read immediately when reading 
from a fallback branch throws. "
+                                    + "By default the failure is logged with 
the full stack trace and the reader "
+                                    + "falls through to the current branch, 
which can mask data issues. "
+                                    + "Set this to true to surface fallback 
branch errors to the caller instead.");
+
     public static final ConfigOption<String> SCAN_PRIMARY_BRANCH =
             key("scan.primary-branch")
                     .stringType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index addca008d1..5c32135f76 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -573,12 +573,17 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         private final InnerTableRead mainRead;
         private final InnerTableRead fallbackRead;
+        private final boolean fallbackReadFailFast;
 
         private Read() {
             FileStoreTable first = wrappedFirst ? wrapped : other;
             FileStoreTable second = wrappedFirst ? other : wrapped;
             this.mainRead = first.newRead();
             this.fallbackRead = second.newRead();
+            this.fallbackReadFailFast =
+                    wrapped.coreOptions()
+                            .toConfiguration()
+                            
.get(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST);
         }
 
         @Override
@@ -623,10 +628,23 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                 if (fallbackSplit.isFallback()) {
                     try {
                         return 
fallbackRead.createReader(fallbackSplit.wrapped());
-                    } catch (Exception ignored) {
+                    } catch (Exception e) {
+                        if (fallbackReadFailFast) {
+                            if (e instanceof IOException) {
+                                throw (IOException) e;
+                            }
+                            if (e instanceof RuntimeException) {
+                                throw (RuntimeException) e;
+                            }
+                            throw new IOException(
+                                    "Failed to read fallback branch split: "
+                                            + fallbackSplit.wrapped(),
+                                    e);
+                        }
                         LOG.error(
                                 "Reading from supplemental branch has 
problems: {}",
-                                fallbackSplit.wrapped());
+                                fallbackSplit.wrapped(),
+                                e);
                     }
                 }
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
index 6a1f782fd2..7f586875a1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -34,7 +35,9 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -42,10 +45,13 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -53,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link FallbackReadFileStoreTable}. */
 public class FallbackReadFileStoreTableTest {
@@ -242,6 +249,89 @@ public class FallbackReadFileStoreTableTest {
         assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3);
     }
 
+    @Test
+    public void testFallbackReadFailFastDefaultSwallowsException() throws 
Exception {
+        FallbackReadFileStoreTable table = 
setUpTableWithThrowingFallback(false);
+        Split split = onlyFallbackSplit(table);
+
+        // Default behavior: the failing fallback read is swallowed and the 
reader
+        // falls through to the main branch, which has no data for partition 3 
and
+        // either returns an empty reader or throws something other than the
+        // injected fallback exception.
+        try {
+            table.newRead().createReader(split);
+        } catch (Exception e) {
+            assertThat(e.getMessage())
+                    .as("fallback exception must not propagate when fail-fast 
is disabled")
+                    .doesNotContain("injected fallback failure");
+        }
+    }
+
+    @Test
+    public void testFallbackReadFailFastPropagatesException() throws Exception 
{
+        FallbackReadFileStoreTable table = 
setUpTableWithThrowingFallback(true);
+        Split split = onlyFallbackSplit(table);
+
+        assertThatThrownBy(() -> table.newRead().createReader(split))
+                .hasMessageContaining("injected fallback failure");
+    }
+
+    private FallbackReadFileStoreTable setUpTableWithThrowingFallback(boolean 
failFast)
+            throws Exception {
+        String branchName = "bc";
+        FileStoreTable mainTable = createTable();
+        writeDataIntoTable(mainTable, 0, rowData(1, 10));
+        mainTable.createBranch(branchName);
+        FileStoreTable branchTable = createTableFromBranch(mainTable, 
branchName);
+        writeDataIntoTable(branchTable, 0, rowData(3, 60));
+
+        Options overrides = new Options();
+        overrides.set(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST, 
failFast);
+        FileStoreTable mainWithOption = mainTable.copy(overrides.toMap());
+
+        FileStoreTable spyBranch = Mockito.spy(branchTable);
+        InnerTableRead throwing = throwingInnerTableRead();
+        Mockito.doReturn(throwing).when(spyBranch).newRead();
+
+        return new FallbackReadFileStoreTable(mainWithOption, spyBranch, true);
+    }
+
+    private static Split onlyFallbackSplit(FallbackReadFileStoreTable table) {
+        DataTableScan scan = table.newScan();
+        scan.withFilter(new PredicateBuilder(ROW_TYPE).equal(0, 3));
+        List<Split> splits = scan.plan().splits();
+        assertThat(splits).hasSize(1);
+        FallbackReadFileStoreTable.FallbackSplit fs =
+                (FallbackReadFileStoreTable.FallbackSplit) splits.get(0);
+        assertThat(fs.isFallback()).isTrue();
+        return splits.get(0);
+    }
+
+    private static InnerTableRead throwingInnerTableRead() {
+        return new InnerTableRead() {
+            @Override
+            public InnerTableRead withFilter(Predicate predicate) {
+                return this;
+            }
+
+            @Override
+            public InnerTableRead withReadType(RowType readType) {
+                return this;
+            }
+
+            @Override
+            public TableRead withIOManager(org.apache.paimon.disk.IOManager 
ioManager) {
+                return this;
+            }
+
+            @Override
+            public org.apache.paimon.reader.RecordReader<InternalRow> 
createReader(Split split)
+                    throws IOException {
+                throw new IOException("injected fallback failure");
+            }
+        };
+    }
+
     private void writeDataIntoTable(
             FileStoreTable table, long commitIdentifier, InternalRow... 
allData) throws Exception {
         StreamTableWrite write = table.newWrite(commitUser);

Reply via email to