sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1524068006
My team is trying to develop a Custom Transformer class that can skip over
null (tombstone) records from PostgresDebezium Kafka Source to address this. We
are attempting along the lines of:
```
public class TombstoneTransformer implements Transformer {
private static final Logger LOG =
LoggerFactory.getLogger(TombstoneTransformer.class);
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
TypedProperties properties) {
LOG.info("TombstoneTransformer " + rowDataset);
// // NullPointerException happens on the following line:
// List<Row> rowList = rowDataset.collectAsList();
// // NullPointerException happens on the following line:
// newRowSet.collectAsList().stream().limit(50).forEach(x ->
LOG.info(x.toString()));
// // Later in DeltaSync, tombstone records appear to still be present
and results in NullPointerException later
// Dataset<Row> newRowSet = rowDataset.filter("_change_operation_type is
not null");
// // Later in DeltaSync, tombstone records appear to still be present
and results in NullPointerException later
Dataset<Row> newRowSet = rowDataset.filter(Objects::nonNull);
return newRowSet;
}
}
```
However, none of the attempts at filtering the rowDataset get rid of the
NullPointerException later in the Deltastreamer ingestion. Moreso, many of the
attempts to log/view the individual records in rowDataset result in
NullPointerException . And so we are wondering if there is something earlier in
the code (maybe the PostgresDebeziumSource.java that flattens messages?) that
runs that could be allowing malformed Row objects to get passed to the Custom
Transformer classes - that somehow is not allowing us to read/access the Rows
and filter out the ones that are null (tombstone) records.
Anyone that might have an idea for how to make this class work? Side note -
we also tried SqlQueryBasedTransformer with “SELECT * FROM <SRC> a WHERE a.id
is not null” and it also did not filter out Tombstones (still had NPE later
during ingestion).
Could someone explain what is happening with delete operations on
`PostgresDebeziumSource` and `PostgresDebeziumAvroPayload` and why they
potentially aren't being handled well? Thanks in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]