This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d9d8d8370 [avro] Writer throw more clear exception when NPE with
aggregation merge function (#4547)
d9d8d8370 is described below
commit d9d8d837028c63666ccdec4d22677a8d869a3d77
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 19 15:29:18 2024 +0800
[avro] Writer throw more clear exception when NPE with aggregation merge
function (#4547)
---
.../org/apache/paimon/io/SingleFileWriter.java | 23 ++++++++++++++----
.../paimon/flink/ContinuousFileStoreITCase.java | 27 ++++++++++++++++++++++
.../paimon/format/avro/AvroRowDatumWriter.java | 11 ++++++++-
3 files changed, 55 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index d41040e05..f303e8597 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -49,7 +49,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
protected final Path path;
private final Function<T, InternalRow> converter;
- private final FormatWriter writer;
+ private FormatWriter writer;
private PositionOutputStream out;
private long recordCount;
@@ -144,7 +144,14 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
@Override
public void abort() {
- IOUtils.closeQuietly(out);
+ if (writer != null) {
+ IOUtils.closeQuietly(writer);
+ writer = null;
+ }
+ if (out != null) {
+ IOUtils.closeQuietly(out);
+ out = null;
+ }
fileIO.deleteQuietly(path);
}
@@ -167,9 +174,15 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
}
try {
- writer.close();
- out.flush();
- out.close();
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ if (out != null) {
+ out.flush();
+ out.close();
+ out = null;
+ }
} catch (IOException e) {
LOG.warn("Exception occurs when closing file {}. Cleaning up.",
path, e);
abort();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index cf97f7b67..2e1569751 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -28,6 +28,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
@@ -43,6 +44,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -629,4 +631,29 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("10",
"11", "12"));
iterator.close();
}
+
+ @Test
+ public void testAvroRetractNotNullField() {
+ List<Row> input =
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 1, "A"),
Row.ofKind(RowKind.DELETE, 1, "A"));
+ String id = TestValuesTableFactory.registerData(input);
+ sEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY TABLE source (pk INT PRIMARY KEY NOT
ENFORCED, a STRING) "
+ + "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
+ + "'changelog-mode' = 'I,D,UA,UB')",
+ id));
+
+ sql(
+ "CREATE TABLE avro_sink (pk INT PRIMARY KEY NOT ENFORCED, a
STRING NOT NULL) "
+ + " WITH ('file.format' = 'avro', 'merge-engine' =
'aggregation')");
+
+ assertThatThrownBy(
+ () -> sEnv.executeSql("INSERT INTO avro_sink select *
from source").await())
+ .satisfies(
+ anyCauseMatches(
+ RuntimeException.class,
+ "Caught NullPointerException, the possible
reason is you have set following options together"));
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
index c2bd81d00..d30245162 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
@@ -56,6 +56,15 @@ public class AvroRowDatumWriter implements
DatumWriter<InternalRow> {
// top Row is a UNION type
out.writeIndex(1);
}
- this.writer.writeRow(datum, out);
+ try {
+ this.writer.writeRow(datum, out);
+ } catch (NullPointerException npe) {
+ throw new RuntimeException(
+ "Caught NullPointerException, the possible reason is you
have set following options together:\n"
+ + " 1. file.format = avro;\n"
+ + " 2. merge-function =
aggregation/partial-update;\n"
+ + " 3. some fields are not null.",
+ npe);
+ }
}
}