[ 
https://issues.apache.org/jira/browse/BEAM-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220396#comment-17220396
 ] 

Varun edited comment on BEAM-9873 at 10/25/20, 9:18 PM:
--------------------------------------------------------

{code:java}

// Read messages from Subscription with some windowing strategy

final PCollection<String> readMessages = pipeline.apply("Read from 
Subscription", PubsubIO.readStrings().apply()....)

// Apply filter transformation to remove invalid jsons

PCollection<String> validMessages = readMessages.apply("Remove invalid 
messages", Filter.by(Validate.IS_VALID));
        

// Enum definition. 

    enum Validate implements SerializableFunction<String, Boolean> {
        IS_VALID {
            @Override
            public Boolean apply(String input) {
                try {
                    jsonMapper.readTree(input);
                    return true;
                } catch (IOException ex) {
                    LOG.warn(input);
                    return false;
                }
            }
        }
}
{code}

This enum can be defined in Filter class (org.apache.beam.sdk.transforms.Filter)


was (Author: varun.sharma):

{code:java}

// Read messages from Subscription with some windowing strategy

final PCollection<String> readMessages = pipeline.apply("Read from 
Subscription", PubsubIO.readStrings().apply()....)

// Apply filter transformation to remove invalid jsons

PCollection<String> validMessages = readMessages.apply("Remove invalid 
messages", Filter.by(Validate.IS_VALID));
        

// Enum definition. 

    enum Validate implements SerializableFunction<String, Boolean> {
        IS_VALID {
            @Override
            public Boolean apply(String input) {
                try {
                    jsonMapper.readTree(input);
                    return true;
                } catch (IOException ex) {
                    LOG.warn(input);
                    return false;
                }
            }
        }
{code}

This enum can be defined in Filter class (org.apache.beam.sdk.transforms.Filter)

> Removing Invalid JSON messages from PCollection before starting BigQueryIO 
> Operations
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-9873
>                 URL: https://issues.apache.org/jira/browse/BEAM-9873
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Varun
>            Assignee: Varun
>            Priority: P4
>              Labels: Clarified, features
>
> In a typical set up of Pub Sub and Cloud Dataflow, a pub sub subscriber might 
> receive some messages that does not follow a valid json structure and the Big 
> Query Insert operation fails to process these messages and the worker may 
> gets terminated if the exception is not handled correctly.
> The likelihood of receiving the invalid json messages are very less and the 
> upstream component pushing messages on the Topic should have a validation at 
> their end but this is not always the case and the application should be 
> robust enough to survive even if there are wrong messages being pushed by the 
> Upstreams. 
> I have created an Enum which acts like a Predicate in Filter transform. This 
> is very standard logic of validating Json and i would like to add this to the 
> java SDK(and Python) in the Filter transform 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to