This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 87db59412 [paimon] Improve npe error in FlussRecordAsPaimonRow (#2765)
87db59412 is described below
commit 87db59412f32ecbe8e49a470e06d156b032928dc
Author: Thorne <[email protected]>
AuthorDate: Wed Mar 4 13:47:07 2026 +0800
[paimon] Improve npe error in FlussRecordAsPaimonRow (#2765)
---
.../paimon/tiering/FlussRecordAsPaimonRow.java | 8 ++++++++
.../paimon/tiering/FlussRecordAsPaimonRowTest.java | 22 ++++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index c092fbdab..bc0303010 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -27,6 +27,8 @@ import org.apache.paimon.types.RowType;
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
@@ -49,6 +51,7 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
}
public void setFlussRecord(LogRecord logRecord) {
+ checkNotNull(logRecord, "logRecord must not be null.");
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
int flussFieldCount = internalRow.getFieldCount();
@@ -74,11 +77,13 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
@Override
public RowKind getRowKind() {
+ checkState(logRecord != null, "setFlussRecord() must be called before
accessing the row.");
return toRowKind(logRecord.getChangeType());
}
@Override
public boolean isNullAt(int pos) {
+ checkState(logRecord != null, "setFlussRecord() must be called before
accessing the row.");
if (pos < originRowFieldCount) {
return super.isNullAt(pos);
}
@@ -96,6 +101,7 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
// bucket system column
return bucket;
}
+ checkState(logRecord != null, "setFlussRecord() must be called before
accessing the row.");
if (pos >= originRowFieldCount) {
throw new IllegalStateException(
String.format(
@@ -107,6 +113,7 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
@Override
public long getLong(int pos) {
+ checkState(logRecord != null, "setFlussRecord() must be called before
accessing the row.");
if (pos == offsetFieldIndex) {
// offset system column
return logRecord.logOffset();
@@ -126,6 +133,7 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
@Override
public Timestamp getTimestamp(int pos, int precision) {
+ checkState(logRecord != null, "setFlussRecord() must be called before
accessing the row.");
// it's timestamp system column
if (pos == timestampFieldIndex) {
return Timestamp.fromEpochMillis(logRecord.timestamp());
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
index 15f617971..e085b694b 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
@@ -1071,4 +1071,26 @@ class FlussRecordAsPaimonRowTest {
assertThat(map).isNotNull();
assertThat(map.size()).isEqualTo(0);
}
+
+ @Test
+ void testAccessRowBeforeSetThrowsIllegalState() {
+ String expectedMsg = "setFlussRecord() must be called before accessing
the row.";
+ RowType rowType =
+ RowType.of(
+ new org.apache.paimon.types.IntType(),
+ // system columns: __bucket, __offset, __timestamp
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BigIntType(),
+ new
org.apache.paimon.types.LocalZonedTimestampType(3));
+ FlussRecordAsPaimonRow row = new FlussRecordAsPaimonRow(0, rowType);
+ assertThatThrownBy(row::getRowKind)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(expectedMsg);
+ assertThatThrownBy(() -> row.isNullAt(0))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(expectedMsg);
+ assertThatThrownBy(() -> row.getLong(0))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(expectedMsg);
+ }
}