[
https://issues.apache.org/jira/browse/BEAM-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220396#comment-17220396
]
Varun edited comment on BEAM-9873 at 10/25/20, 9:18 PM:
--------------------------------------------------------
{code:java}
// Read messages from Subscription with some windowing strategy
final PCollection<String> readMessages = pipeline.apply("Read from
Subscription", PubsubIO.readStrings().apply()....)
// Apply filter transformation to remove invalid jsons
PCollection<String> validMessages = readMessages.apply("Remove invalid
messages", Filter.by(Validate.IS_VALID));
// Enum definition.
enum Validate implements SerializableFunction<String, Boolean> {
IS_VALID {
@Override
public Boolean apply(String input) {
try {
jsonMapper.readTree(input);
return true;
} catch (IOException ex) {
LOG.warn(input);
return false;
}
}
}
}
{code}
This enum can be defined in Filter class (org.apache.beam.sdk.transforms.Filter)
was (Author: varun.sharma):
{code:java}
// Read messages from Subscription with some windowing strategy
final PCollection<String> readMessages = pipeline.apply("Read from
Subscription", PubsubIO.readStrings().apply()....)
// Apply filter transformation to remove invalid jsons
PCollection<String> validMessages = readMessages.apply("Remove invalid
messages", Filter.by(Validate.IS_VALID));
// Enum definition.
enum Validate implements SerializableFunction<String, Boolean> {
IS_VALID {
@Override
public Boolean apply(String input) {
try {
jsonMapper.readTree(input);
return true;
} catch (IOException ex) {
LOG.warn(input);
return false;
}
}
}
{code}
This enum can be defined in Filter class (org.apache.beam.sdk.transforms.Filter)
> Removing Invalid JSON messages from PCollection before starting BigQueryIO
> Operations
> -------------------------------------------------------------------------------------
>
> Key: BEAM-9873
> URL: https://issues.apache.org/jira/browse/BEAM-9873
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Varun
> Assignee: Varun
> Priority: P4
> Labels: Clarified, features
>
> In a typical set up of Pub Sub and Cloud Dataflow, a pub sub subscriber might
> receive some messages that does not follow a valid json structure and the Big
> Query Insert operation fails to process these messages and the worker may
> gets terminated if the exception is not handled correctly.
> The likelihood of receiving the invalid json messages are very less and the
> upstream component pushing messages on the Topic should have a validation at
> their end but this is not always the case and the application should be
> robust enough to survive even if there are wrong messages being pushed by the
> Upstreams.
> I have created an Enum which acts like a Predicate in Filter transform. This
> is very standard logic of validating Json and i would like to add this to the
> java SDK(and Python) in the Filter transform
--
This message was sent by Atlassian Jira
(v8.3.4#803005)