This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b72892272c [core] Add eager failure for fallback branch read failures
(#7765)
b72892272c is described below
commit b72892272c32106e92cf2b02d5c5d354254e730c
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 7 15:35:09 2026 +0530
[core] Add eager failure for fallback branch read failures (#7765)
- When reading from a fallback branch fails, the exception is swallowed
and the reader falls through to the current branch. This can mask data
issues, and the LOG.error does not include the exception object so the
stack trace was lost.
- Added `scan.fallback-branch.read-fail-fast` (default false). When
enabled the exception is propagated to the caller so the read fails
immediately. When disabled the failure is logged with the full stack
trace and the reader still falls through to the current branch.
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 10 +++
.../paimon/table/FallbackReadFileStoreTable.java | 22 +++++-
.../table/FallbackReadFileStoreTableTest.java | 90 ++++++++++++++++++++++
4 files changed, 126 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index af7e996c8b..a1b016fecf 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1187,6 +1187,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td>String</td>
<td>When a batch job queries from a table, if a partition does not
exist in the current branch, the reader will try to get this partition from
this fallback branch.</td>
</tr>
+ <tr>
+ <td><h5>scan.fallback-branch.read-fail-fast</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to fail the read immediately when reading from a
fallback branch throws. By default the failure is logged with the full stack
trace and the reader falls through to the current branch, which can mask data
issues. Set this to true to surface fallback branch errors to the caller
instead.</td>
+ </tr>
<tr>
<td><h5>scan.fallback-delta-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 2f24db43f4..3364ca4d39 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1972,6 +1972,16 @@ public class CoreOptions implements Serializable {
"When a batch job queries from a table, if a
partition does not exist in the current branch, "
+ "the reader will try to get this
partition from this fallback branch.");
+ public static final ConfigOption<Boolean>
SCAN_FALLBACK_BRANCH_READ_FAIL_FAST =
+ key("scan.fallback-branch.read-fail-fast")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to fail the read immediately when reading
from a fallback branch throws. "
+ + "By default the failure is logged with
the full stack trace and the reader "
+ + "falls through to the current branch,
which can mask data issues. "
+ + "Set this to true to surface fallback
branch errors to the caller instead.");
+
public static final ConfigOption<String> SCAN_PRIMARY_BRANCH =
key("scan.primary-branch")
.stringType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index addca008d1..5c32135f76 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -573,12 +573,17 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
private final InnerTableRead mainRead;
private final InnerTableRead fallbackRead;
+ private final boolean fallbackReadFailFast;
private Read() {
FileStoreTable first = wrappedFirst ? wrapped : other;
FileStoreTable second = wrappedFirst ? other : wrapped;
this.mainRead = first.newRead();
this.fallbackRead = second.newRead();
+ this.fallbackReadFailFast =
+ wrapped.coreOptions()
+ .toConfiguration()
+
.get(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST);
}
@Override
@@ -623,10 +628,23 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
if (fallbackSplit.isFallback()) {
try {
return
fallbackRead.createReader(fallbackSplit.wrapped());
- } catch (Exception ignored) {
+ } catch (Exception e) {
+ if (fallbackReadFailFast) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ }
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new IOException(
+ "Failed to read fallback branch split: "
+ + fallbackSplit.wrapped(),
+ e);
+ }
LOG.error(
"Reading from supplemental branch has
problems: {}",
- fallbackSplit.wrapped());
+ fallbackSplit.wrapped(),
+ e);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
index 6a1f782fd2..7f586875a1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -34,7 +35,9 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -42,10 +45,13 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -53,6 +59,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link FallbackReadFileStoreTable}. */
public class FallbackReadFileStoreTableTest {
@@ -242,6 +249,89 @@ public class FallbackReadFileStoreTableTest {
assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3);
}
+ @Test
+ public void testFallbackReadFailFastDefaultSwallowsException() throws
Exception {
+ FallbackReadFileStoreTable table =
setUpTableWithThrowingFallback(false);
+ Split split = onlyFallbackSplit(table);
+
+ // Default behavior: the failing fallback read is swallowed and the
reader
+ // falls through to the main branch, which has no data for partition 3
and
+ // either returns an empty reader or throws something other than the
+ // injected fallback exception.
+ try {
+ table.newRead().createReader(split);
+ } catch (Exception e) {
+ assertThat(e.getMessage())
+ .as("fallback exception must not propagate when fail-fast
is disabled")
+ .doesNotContain("injected fallback failure");
+ }
+ }
+
+ @Test
+ public void testFallbackReadFailFastPropagatesException() throws Exception
{
+ FallbackReadFileStoreTable table =
setUpTableWithThrowingFallback(true);
+ Split split = onlyFallbackSplit(table);
+
+ assertThatThrownBy(() -> table.newRead().createReader(split))
+ .hasMessageContaining("injected fallback failure");
+ }
+
+ private FallbackReadFileStoreTable setUpTableWithThrowingFallback(boolean
failFast)
+ throws Exception {
+ String branchName = "bc";
+ FileStoreTable mainTable = createTable();
+ writeDataIntoTable(mainTable, 0, rowData(1, 10));
+ mainTable.createBranch(branchName);
+ FileStoreTable branchTable = createTableFromBranch(mainTable,
branchName);
+ writeDataIntoTable(branchTable, 0, rowData(3, 60));
+
+ Options overrides = new Options();
+ overrides.set(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST,
failFast);
+ FileStoreTable mainWithOption = mainTable.copy(overrides.toMap());
+
+ FileStoreTable spyBranch = Mockito.spy(branchTable);
+ InnerTableRead throwing = throwingInnerTableRead();
+ Mockito.doReturn(throwing).when(spyBranch).newRead();
+
+ return new FallbackReadFileStoreTable(mainWithOption, spyBranch, true);
+ }
+
+ private static Split onlyFallbackSplit(FallbackReadFileStoreTable table) {
+ DataTableScan scan = table.newScan();
+ scan.withFilter(new PredicateBuilder(ROW_TYPE).equal(0, 3));
+ List<Split> splits = scan.plan().splits();
+ assertThat(splits).hasSize(1);
+ FallbackReadFileStoreTable.FallbackSplit fs =
+ (FallbackReadFileStoreTable.FallbackSplit) splits.get(0);
+ assertThat(fs.isFallback()).isTrue();
+ return splits.get(0);
+ }
+
+ private static InnerTableRead throwingInnerTableRead() {
+ return new InnerTableRead() {
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(org.apache.paimon.disk.IOManager
ioManager) {
+ return this;
+ }
+
+ @Override
+ public org.apache.paimon.reader.RecordReader<InternalRow>
createReader(Split split)
+ throws IOException {
+ throw new IOException("injected fallback failure");
+ }
+ };
+ }
+
private void writeDataIntoTable(
FileStoreTable table, long commitIdentifier, InternalRow...
allData) throws Exception {
StreamTableWrite write = table.newWrite(commitUser);