[ 
https://issues.apache.org/jira/browse/BEAM-12005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17549235#comment-17549235
 ] 

Danny McCormick commented on BEAM-12005:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/20842

> getting issue to load file into database (java.lang.ClassCastException: 
> java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12005
>                 URL: https://issues.apache.org/jira/browse/BEAM-12005
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jdbc
>    Affects Versions: 2.28.0
>            Reporter: Gaurav Khandelwal
>            Priority: P3
>              Labels: ClassCastException, JdbcIO, MySQL, apache-beam
>
> Hi Team,
> We are getting below error :
>  org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.beam.sdk.values.KV
>   
>  Our target is to load file into database. We tried following approach: 
> {code:java}
> @SuppressWarnings("unchecked")
>       public static void main(String[] args) {
>               PCSI02AOptions options = 
> PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
>               Pipeline p = Pipeline.create(options);
>               PCollection data1 = p.apply("Reading Text", 
> TextIO.read().from(options.getInputFile()))
>                                       .apply(ParDo.of(new GetRatePlanID()))
>                                       .apply("Format Result", 
>                                                       
> MapElements.into(TypeDescriptors.strings())
>                                                       .via((KV<String, 
> Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
>               data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, 
> String>readAll()
>                               
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>                                               
> .create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC")
>                                               .withUsername("abc")
>                                               .withPassword("abc123"))
>                               .withCoder(StringUtf8Coder.of())
>                               .withParameterSetter(new 
> JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
>                                       @Override
>                                       public void setParameters(KV<String, 
> Iterable<Integer>> element,
>                                                       PreparedStatement 
> preparedStatement) throws Exception {
>                                               String[] range = 
> element.getKey().split(",");
>                                               preparedStatement.setInt(1, 
> Integer.parseInt(range[0]));
>                                       }
>                               }).withQuery("select * from ABC.PAY_PLAN_INFO 
> where plan_key = ?")
>                               .withRowMapper((JdbcIO.RowMapper<String>) 
> resultSet -> {
>                                       ObjectMapper mapper = new 
> ObjectMapper();
>                                       ArrayNode arrayNode = 
> mapper.createArrayNode();
>                                       for (int i = 1; i <= 
> resultSet.getMetaData().getColumnCount(); i++) {
>                                               try {
>                                                       ObjectNode objectNode = 
> mapper.createObjectNode();
>                                                       
> objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
>                                                       
> objectNode.put("value",resultSet.getString(i));
>                                                       
> arrayNode.add(objectNode);
>                                               } catch (Exception e) {
>                                                       throw e;
>                                               }
>                                       }
>                                       return 
> mapper.writeValueAsString(arrayNode);
>                               })
>               )
>               ;
>               State result = p.run().waitUntilFinish();
>               System.out.println(result);
>       }
> private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
>               @ProcessElement
>               public void processElement(ProcessContext c)
>               {
>                       String[] data = c.element().split(",");
>                       Integer plankey = Integer.parseInt(data[0]);
>                       String planid = data[1];
>                       c.output(KV.of(planid, plankey));
>               }
>       }{code}
>  
> Error:
> {code:java}
> Exception in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.beam.sdk.values.KVException in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.beam.sdk.values.KV 
>     at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
>  
>     at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
>  
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) 
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) 
>     at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.beam.sdk.values.KV 
>     at 
> com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1) 
>     at 
> org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910){code}
>  
>  Kindly suggest how we can resolve it ? Or do we have any reference for same 
> if we have kindly share link or snippets.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to