avkarthk opened a new issue, #32075:
URL: https://github.com/apache/beam/issues/32075

   ### What happened?
   
   **Exception trace:**
   java.lang.IllegalArgumentException: Type of @Element must match the DoFn 
typeFiltering Failed Redemption 
status/ParMultiDo(FilterUsersBasedOnStatus).output 
[PCollection@1605886859]\n\tat 
org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation(ParDo.java:654)\n\tat
 
org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.getFieldAccess(FieldAccessVisitor.java:76)\n\tat
 
org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.visitPrimitiveTransform(FieldAccessVisitor.java:48)\n\tat
 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)\n\tat
 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat
 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat
 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)\n\tat
 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)\n\tat
 org.apache.beam.sdk.Pipeli
 ne.traverseTopologically(Pipeline.java:477)\n\tat 
org.apache.beam.sdk.util.construction.graph.ProjectionPushdownOptimizer.optimize(ProjectionPushdownOptimizer.java:63)\n\tat
 org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:83)\n\tat 
org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)\n\tat 
org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)\n\tat
   
   
   Code Snippet :
    PCollection<KV<String, List<BatchUserDetails>>> innerKVs = 
redeemAccessCode.apply("Extracting AC KV", Values.create());
   
           PCollection<List<BatchUserDetails>> listsOfBatchUserDetails = 
innerKVs.apply("Extracting BUD", Values.create());
   
           listsOfBatchUserDetails.apply("Log List of BatchUserDetails", 
ParDo.of(new DoFn<List<BatchUserDetails>, Void>() {
               @ProcessElement
               public void processElement(@Element List<BatchUserDetails> 
element, OutputReceiver<Void> out) {
                   System.out.println("List<BatchUserDetails>: " + element);
               }
           }));
   
           PCollection<BatchUserDetails> redeemAccessCodeValues = 
listsOfBatchUserDetails
                   .apply("Flatten Redemption BatchUserDetails", 
Flatten.<BatchUserDetails>iterables())
                   
.setTypeDescriptor(TypeDescriptor.of(BatchUserDetails.class));
   
           redeemAccessCodeValues.apply("Log BatchUserDetails", ParDo.of(new 
DoFn<BatchUserDetails, Void>() {
               @ProcessElement
               public void processElement(@Element BatchUserDetails element, 
OutputReceiver<Void> out) {
                   logger.info("Element isntance of { 
}",element.getClass().getDeclaredFields());
                   System.out.println("BatchUserDetails: " + element);
               }
           }));
   
   Adding the below two lines the above exception trace is thrown
    PCollection<BatchUserDetails> failedFilteredRedeemAccessCodeValues = 
redeemAccessCodeValues.apply(
                   "Filtering Failed Redemption status",
                   ParDo.of(new 
FilterUsersBasedOnStatusFn(TaskStatus.ACCESS_CODE_REDEMPTION_FAILED.getStatus()))
           );
   
   **FilterUsersBasedOnStatusFn Implementation:**
   public class FilterUsersBasedOnStatusFn extends 
DoFn<BatchUserDetails,BatchUserDetails> {
   
       private static final Logger logger = 
LoggerFactory.getLogger(FilterUsersBasedOnStatusFn.class);
       private final String status;
   
       public FilterUsersBasedOnStatusFn(String status){
           this.status = status;
       }
   
       @ProcessElement
       public void processElement(@Element BatchUserDetails batchUserDetails,
                                  OutputReceiver<BatchUserDetails> 
filteredUserDetails) {
           String batchId = batchUserDetails.getBatchId();
          
           if (status != null &  
status.equalsIgnoreCase(batchUserDetails.getTaskDetails().getTaskResponse().getStatus()))
 {
               filteredUserDetails.output(batchUserDetails);
           }
       }
   
   }
   
   The Application is deployed in AWS as a Managed Flink Service
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to