avkarthk opened a new issue, #32075:
URL: https://github.com/apache/beam/issues/32075
### What happened?
**Exception trace:**
java.lang.IllegalArgumentException: Type of @Element must match the DoFn
typeFiltering Failed Redemption
status/ParMultiDo(FilterUsersBasedOnStatus).output
[PCollection@1605886859]\n\tat
org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation(ParDo.java:654)\n\tat
org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.getFieldAccess(FieldAccessVisitor.java:76)\n\tat
org.apache.beam.sdk.util.construction.graph.FieldAccessVisitor.visitPrimitiveTransform(FieldAccessVisitor.java:48)\n\tat
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)\n\tat
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)\n\tat
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)\n\tat
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)\n\tat
org.apache.beam.sdk.Pipeli
ne.traverseTopologically(Pipeline.java:477)\n\tat
org.apache.beam.sdk.util.construction.graph.ProjectionPushdownOptimizer.optimize(ProjectionPushdownOptimizer.java:63)\n\tat
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:83)\n\tat
org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)\n\tat
org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)\n\tat
Code Snippet :
PCollection<KV<String, List<BatchUserDetails>>> innerKVs =
redeemAccessCode.apply("Extracting AC KV", Values.create());
PCollection<List<BatchUserDetails>> listsOfBatchUserDetails =
innerKVs.apply("Extracting BUD", Values.create());
listsOfBatchUserDetails.apply("Log List of BatchUserDetails",
ParDo.of(new DoFn<List<BatchUserDetails>, Void>() {
@ProcessElement
public void processElement(@Element List<BatchUserDetails>
element, OutputReceiver<Void> out) {
System.out.println("List<BatchUserDetails>: " + element);
}
}));
PCollection<BatchUserDetails> redeemAccessCodeValues =
listsOfBatchUserDetails
.apply("Flatten Redemption BatchUserDetails",
Flatten.<BatchUserDetails>iterables())
.setTypeDescriptor(TypeDescriptor.of(BatchUserDetails.class));
redeemAccessCodeValues.apply("Log BatchUserDetails", ParDo.of(new
DoFn<BatchUserDetails, Void>() {
@ProcessElement
public void processElement(@Element BatchUserDetails element,
OutputReceiver<Void> out) {
logger.info("Element isntance of {
}",element.getClass().getDeclaredFields());
System.out.println("BatchUserDetails: " + element);
}
}));
Adding the below two lines the above exception trace is thrown
PCollection<BatchUserDetails> failedFilteredRedeemAccessCodeValues =
redeemAccessCodeValues.apply(
"Filtering Failed Redemption status",
ParDo.of(new
FilterUsersBasedOnStatusFn(TaskStatus.ACCESS_CODE_REDEMPTION_FAILED.getStatus()))
);
**FilterUsersBasedOnStatusFn Implementation:**
public class FilterUsersBasedOnStatusFn extends
DoFn<BatchUserDetails,BatchUserDetails> {
private static final Logger logger =
LoggerFactory.getLogger(FilterUsersBasedOnStatusFn.class);
private final String status;
public FilterUsersBasedOnStatusFn(String status){
this.status = status;
}
@ProcessElement
public void processElement(@Element BatchUserDetails batchUserDetails,
OutputReceiver<BatchUserDetails>
filteredUserDetails) {
String batchId = batchUserDetails.getBatchId();
if (status != null &
status.equalsIgnoreCase(batchUserDetails.getTaskDetails().getTaskResponse().getStatus()))
{
filteredUserDetails.output(batchUserDetails);
}
}
}
The Application is deployed in AWS as a Managed Flink Service
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]