This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c39aff1771c05bbdb3789bf490c8d8cb366118a9 Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Sep 30 00:33:33 2025 +0200 [hotfix] Introduce ProjectedRowData.isNullAtNonProjected --- .../flink/table/data/utils/ProjectedRowData.java | 24 +++++++++++++++++++++- .../table/data/utils/ProjectedRowDataTest.java | 12 +++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java index 0916ea6d37d..8e9e74e20ba 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java @@ -45,11 +45,17 @@ import java.util.Arrays; public class ProjectedRowData implements RowData { private final int[] indexMapping; + private final boolean isNullAtNonProjected; private RowData row; private ProjectedRowData(int[] indexMapping) { + this(indexMapping, false); + } + + protected ProjectedRowData(int[] indexMapping, boolean isNullAtNonProjected) { this.indexMapping = indexMapping; + this.isNullAtNonProjected = isNullAtNonProjected; } /** @@ -82,7 +88,8 @@ public class ProjectedRowData implements RowData { @Override public boolean isNullAt(int pos) { - return row.isNullAt(indexMapping[pos]); + return (pos >= indexMapping.length && isNullAtNonProjected) + || row.isNullAt(indexMapping[pos]); } @Override @@ -226,6 +233,21 @@ public class ProjectedRowData implements RowData { return new ProjectedRowData(projection); } + /** + * Create an empty {@link ProjectedRowData} starting from a {@code projection} array with nulls + * allowed for non-projected fields. + * + * <p>The array represents the mapping of the fields of the original {@link DataType}. For + * example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd + * field and the 2nd field of the row. + * + * @see Projection + * @see ProjectedRowData + */ + public static ProjectedRowData from(int[] projection, boolean isNullAtNonProjected) { + return new ProjectedRowData(projection, isNullAtNonProjected); + } + /** * Create an empty {@link ProjectedRowData} starting from a {@link Projection}. * diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java index 42f4a5b0637..b730a26841f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java @@ -27,6 +27,8 @@ import org.junit.jupiter.api.Test; import static org.apache.flink.table.test.TableAssertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for {@link ProjectedRowData}. */ class ProjectedRowDataTest { @@ -67,4 +69,14 @@ class ProjectedRowDataTest { })) .isInstanceOf(IllegalArgumentException.class); } + + @Test + void testIsNullAtNonProjected() { + RowData initialRow = GenericRowData.of(-1L, -1L, 3L, -1L, 5L); + RowData projected = ProjectedRowData.from(new int[] {2, 4}, true).replaceRow(initialRow); + assertFalse(projected.isNullAt(0)); // 3L, projected + assertFalse(projected.isNullAt(1)); // 5L, projected + assertTrue(projected.isNullAt(2)); // not projected + assertTrue(projected.isNullAt(3)); // not projected and doesn't exist in the original + } }
