damccorm opened a new issue, #20842:
URL: https://github.com/apache/beam/issues/20842

   Hi Team,
   
   We are getting below error :
    org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.beam.sdk.values.KV
     
    Our target is to load file into database. We tried following approach: 
   ```
   
   @SuppressWarnings("unchecked")
        public static void main(String[] args) {
                PCSI02AOptions options
   = 
PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
                Pipeline
   p = Pipeline.create(options);
   
   
                PCollection data1 = p.apply("Reading Text", 
TextIO.read().from(options.getInputFile()))
                                        .apply(ParDo.of(new
   GetRatePlanID()))
                                        .apply("Format Result", 
                                                        
MapElements.into(TypeDescriptors.strings())
                                                        .via((KV<String,
   Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
   
   
                data1.apply(JdbcIO.<KV<String, Iterable<Integer>>,
   String>readAll()
                                
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                                                
.create("com.mysql.cj.jdbc.Driver",
   "jdbc:mysql://localhost:3306/ABC")
                                                .withUsername("abc")
                                                .withPassword("abc123"))
                                .withCoder(StringUtf8Coder.of())
                                .withParameterSetter(new
   JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
                                        @Override
                                        public void
   setParameters(KV<String, Iterable<Integer>> element,
                                                        PreparedStatement 
preparedStatement) throws
   Exception {
                                                String[] range = 
element.getKey().split(",");
                                                preparedStatement.setInt(1,
   Integer.parseInt(range[0]));
                                        }
   
   
                                }).withQuery("select * from ABC.PAY_PLAN_INFO 
where plan_key
   = ?")
                                .withRowMapper((JdbcIO.RowMapper<String>) 
resultSet -> {
                                        ObjectMapper mapper = new
   ObjectMapper();
                                        ArrayNode arrayNode = 
mapper.createArrayNode();
                                        for (int i = 1; i <= 
resultSet.getMetaData().getColumnCount();
   i++) {
                                                try {
                                                        ObjectNode objectNode = 
mapper.createObjectNode();
                                                        
objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
                                                        
objectNode.put("value",resultSet.getString(i));
                                                        
arrayNode.add(objectNode);
                                                }
   catch (Exception e) {
                                                        throw e;
                                                }
                                        }
                                        return 
mapper.writeValueAsString(arrayNode);
                                })
                )
                ;
   
   
                State
   result = p.run().waitUntilFinish();
                System.out.println(result);
        }
   
   
   private static class GetPlanID
   extends DoFn<String, KV<String, Integer>> {
                @ProcessElement
                public void processElement(ProcessContext
   c)
                {
                        String[] data = c.element().split(",");
                        Integer plankey = Integer.parseInt(data[0]);
                        String
   planid = data[1];
                        c.output(KV.of(planid, plankey));
                }
        }
   ```
   
    
   
   Error:
   ```
   
   Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException:
   java.lang.String cannot be cast to org.apache.beam.sdk.values.KVException in 
thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
   java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.beam.sdk.values.KV 
     
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
   
       at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
   
       at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 
       at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
   
       at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
   Caused by: java.lang.ClassCastException:
   java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
       at 
com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1)
   
       at 
org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910)
   ```
   
    
    Kindly suggest how we can resolve it ? Or do we have any reference for same 
if we have kindly share link or snippets.
   
   Imported from Jira 
[BEAM-12005](https://issues.apache.org/jira/browse/BEAM-12005). Original Jira 
may contain additional context.
   Reported by: khgaura.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to