[ 
https://issues.apache.org/jira/browse/BEAM-12005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303528#comment-17303528
 ] 

Gaurav Khandelwal commented on BEAM-12005:
------------------------------------------

Hi [~aizhamal], please let me know if you need more information about the 
issue. 

> getting issue to load file into database (java.lang.ClassCastException: 
> java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12005
>                 URL: https://issues.apache.org/jira/browse/BEAM-12005
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-community, beam-model, io-java-files, io-java-gcp, 
> io-java-jdbc, runner-dataflow
>    Affects Versions: 2.28.0
>            Reporter: Gaurav Khandelwal
>            Assignee: Aizhamal Nurmamat kyzy
>            Priority: P2
>
> Hi Team,
> We are getting below error :
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.beam.sdk.values.KV
>  
> Our target is to load file into database. We tried following approach: 
> {code:java}
> public static void main(String[] args) {
>       PCSI02AOptions options = 
> PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
>       Pipeline p = Pipeline.create(options); PCollection data1 = 
> p.apply(“Reading Text”, TextIO.read().from(options.getInputFile()))
>                       .apply(ParDo.of(new GetRatePlanID()))
>                       .apply(“Format Result”, 
>                                       
> MapElements.into(TypeDescriptors.strings())
>                                       .via((KV<String, Integer> ABC) -> 
> ABC.getKey() + “,” + +ABC.getValue())); data1.apply(JdbcIO.<KV<String, 
> Iterable<Integer>>, String>readAll()
>                                                       
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>                                                                       
> .create("com.mysql.cj.jdbc.Driver", 
> "jdbc:mysql://localhost:3306/ABC").withUsername("abc")
>                                                                       
> .withPassword(“abc123"))
>                                                                               
>         .withCoder(StringUtf8Coder.of())
>                                                                               
>         .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
> Iterable<Integer>>>() {
>                                                                               
>                 @Override
>                                                                               
>                 public void setParameters(KV<String, Iterable<Integer>> 
> element,
>                                                                               
>                                 PreparedStatement preparedStatement) throws 
> Exception {
>                                                                               
>                         String[] range = element.getKey().split(“,”);
>                                                                               
>                         preparedStatement.setInt(1, 
> Integer.parseInt(range[0]));
>                                                                               
>                 } }).withQuery(“select * from ABC.PAY_PLAN_INFO where 
> plan_key = ?“)
>                                                                               
>         .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
>                                                                               
>                 ObjectMapper mapper = new ObjectMapper();
>                                                                               
>                 ArrayNode arrayNode = mapper.createArrayNode();
>                                                                               
>                 for (int i = 1; i <= 
> resultSet.getMetaData().getColumnCount(); i++) {
>                                                                               
>                         try {
>                                                                               
>                                 ObjectNode objectNode = 
> mapper.createObjectNode();
>                                                                               
>                                 
> objectNode.put(“column_name”,resultSet.getMetaData().getColumnName(i));
>                                                                               
>                                 
> objectNode.put(“value”,resultSet.getString(i));
>                                                                               
>                                 arrayNode.add(objectNode);
>                                                                               
>                         } catch (Exception e) {
>                                                                               
>                                 throw e; }
>                                                                               
>                 }
>                                                                               
>                 return mapper.writeValueAsString(arrayNode); })
>                                                                               
>         )
>                                                                       ; State 
> result = p.run().waitUntilFinish();
>                                                                       
> System.out.println(result);
> }{code}
>  
>  
> Kindly suggest how we can resolve it ? Or do we have any reference for same 
> if we have kindly share link or snippets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to