Hi folks,
I'm trying to test out record deletions on a V2 table but I seem to be
corrupting the Table and receive a NullPointerException trying to read the
Table post deletion.
I have a main Table 'T1' containing 2 records, and another Table 'T2' that
contains 1 record from T1 that I want to delete. I create a DeleteFile out of
T2 using the EqualityDeleteWriter, and then apply the RowDelta to T1 with this
DeleteFile. Post this sequence, this exception is thrown trying to read:
Caused by: java.lang.NullPointerException
at
org.apache.iceberg.DeleteFileIndex.canContainEqDeletesForFile(DeleteFileIndex.java:193)
at
org.apache.iceberg.DeleteFileIndex.canContainDeletesForFile(DeleteFileIndex.java:144)
at
org.apache.iceberg.DeleteFileIndex.lambda$forDataFile$3(DeleteFileIndex.java:134)
at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at
org.apache.iceberg.DeleteFileIndex.forDataFile(DeleteFileIndex.java:135)
at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:115)
at
org.apache.iceberg.ManifestGroup.lambda$planFiles$6(ManifestGroup.java:181)
at
org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
at
org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
at
org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at
org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1330)
at
org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
at
org.apache.iceberg.util.BinPacking$PackingIterator.hasNext(BinPacking.java:106)
at
org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
at
org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
at
org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
at
org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
at
org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
at
org.apache.iceberg.io.CloseableIterable.concat(CloseableIterable.java:122)
at org.apache.iceberg.data.GenericReader.open(GenericReader.java:65)
at
org.apache.iceberg.data.TableScanIterable.iterator(TableScanIterable.java:41)
at
com.gs.ep.da.lake.refinerlib.flink.iceberg.poc.IcebergApplyDeltas.logRecordsInTable(IcebergApplyDeltas.java:111)
I'm reading the table through IcebergGenerics.read() API, which works fine
prior to this. Is there something wrong with this approach? I've included my
code snippet below.
// ...
Table deleteTable = catalog.createTable(deleteDeltaTableIdentifier,
icebergSchema, null, properties);
DataStream<GenericRecord> recordsForDelete =
readParquet(getStreamExecutionEnvironment(), avroSchema, deltaDeletesLocation);
writeToTable(recordsForDelete, avroSchema, icebergSchema,
getTableLoader(deleteTable), deleteTable); // writeToTable() writes
GenericRecords to Table through Iceberg Flink API
deleteTable = catalog.loadTable(deleteDeltaTableIdentifier); // This Table
contains single record looking to delete from dataTable initialized later
TableIdentifier dataTableName =
TableIdentifier.of(icebergProofOfConceptNamespace, getIcebergTableName());
Table dataTable = catalog.loadTable(dataTableName); // Main Table consiting of
2 records, 1 to be deleted
logger.info("Logging records before update");
logRecordsInTable(dataTable); // Works fine
logger.info("Logging records for deletion");
logRecordsInTable(deleteTable); // Works fine
OutputFileFactory fileFactory = OutputFileFactory.builderFor(dataTable, 1,
1).build();
OutputFile deleteOutputFile =
fileFactory.newOutputFile().encryptingOutputFile();
logger.info("Performing deletes");
EqualityDeleteWriter<Record> deleteWriter =
Parquet.writeDeletes(deleteOutputFile)
.forTable(dataTable)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.equalityFieldIds(icebergSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
.buildEqualityWriter();
try(Closeable closeable = deleteWriter; CloseableIterable<Record> deletes =
IcebergGenerics.read(deleteTable).build()) {
deleteWriter.write(deletes);
}
DeleteFile deletes = deleteWriter.toDeleteFile();
Transaction deltaTransaction = dataTable.newTransaction();
deltaTransaction.newRowDelta()
.addDeletes(deletes)
.commit();
deltaTransaction.commitTransaction();
logger.info("Transaction complete");
logger.info("Logging records post update");
logRecordsInTable(dataTable); // Results in NPE
// ...
private void logRecordsInTable(Table dataTable) throws IOException {
try(CloseableIterable<Record> postDeletionRes =
IcebergGenerics.read(dataTable).build()) {
List<String> columnNames =
dataTable.schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
for(Record res : postDeletionRes) {
StringBuilder sb = new StringBuilder().append("[ ");
columnNames.forEach(name -> {
sb.append(name)
.append(": ")
.append(res.getField(name))
.append(", ");
});
sb.append("]");
logger.info(sb.toString());
}
}
}
best,
ah
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>