openinx opened a new issue #2571:
URL: https://github.com/apache/iceberg/issues/2571
I tried to add few lines code to read the iceberg table after writing few
equality delete records in format v2 as following, It will encounter an
NullPointerException. I think it's worth to take some time to dig in what's
the cause.
```patch
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index fd2a71ab..dfe4ec24 100644
---
a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -44,6 +45,7 @@ import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -162,6 +164,16 @@ public class TestFlinkIcebergSinkV2 extends
TableTestBase {
// Execute the program.
env.execute("Test Iceberg Change-Log DataStream.");
+ DataStream<RowData> batch = FlinkSource.forRowData()
+ .env(env)
+ .tableLoader(tableLoader)
+ .streaming(false)
+ .build();
+
+ batch.print();
+
+ env.execute("Test Iceberg Change-Log Batch Read.");
+
table.refresh();
List<Snapshot> snapshots = findValidSnapshots(table);
int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
```
```txt
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source
-> Sink: Print to Std. Out (1/1) (1f110148951a8e6b9a131546ceffee9e) switched
from RUNNING to FAILED on b6da4e08-c07d-4b4c-829d-5d76237be407 @ localhost
(dataPort=-1).
java.lang.NullPointerException
at org.apache.iceberg.data.DeleteFilter.pos(DeleteFilter.java:108)
at
org.apache.iceberg.deletes.Deletes$PositionSetDeleteFilter.shouldKeep(Deletes.java:157)
at org.apache.iceberg.util.Filter$Iterator.shouldKeep(Filter.java:49)
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:67)
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65)
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
at
org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:100)
at
org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:84)
at
org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:104)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]