Gaurav Khandelwal created BEAM-12005:
----------------------------------------

             Summary: getting issue to load file into database 
(java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.beam.sdk.values.KV)
                 Key: BEAM-12005
                 URL: https://issues.apache.org/jira/browse/BEAM-12005
             Project: Beam
          Issue Type: Bug
          Components: beam-community, beam-model, io-java-files, io-java-gcp, 
io-java-jdbc, runner-dataflow
    Affects Versions: 2.28.0
            Reporter: Gaurav Khandelwal
            Assignee: Aizhamal Nurmamat kyzy


Hi Team,

We are getting below error :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.beam.sdk.values.KV
 
Our target is to load file into database. We tried following approach: 
{code:java}
public static void main(String[] args) {
        PCSI02AOptions options = 
PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
        Pipeline p = Pipeline.create(options); PCollection data1 = 
p.apply(“Reading Text”, TextIO.read().from(options.getInputFile()))
                        .apply(ParDo.of(new GetRatePlanID()))
                        .apply(“Format Result”, 
                                        
MapElements.into(TypeDescriptors.strings())
                                        .via((KV<String, Integer> ABC) -> 
ABC.getKey() + “,” + +ABC.getValue())); data1.apply(JdbcIO.<KV<String, 
Iterable<Integer>>, String>readAll()
                                                        
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                                                                        
.create("com.mysql.cj.jdbc.Driver", 
"jdbc:mysql://localhost:3306/ABC").withUsername("abc")
                                                                        
.withPassword(“abc123"))
                                                                                
        .withCoder(StringUtf8Coder.of())
                                                                                
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
Iterable<Integer>>>() {
                                                                                
                @Override
                                                                                
                public void setParameters(KV<String, Iterable<Integer>> element,
                                                                                
                                PreparedStatement preparedStatement) throws 
Exception {
                                                                                
                        String[] range = element.getKey().split(“,”);
                                                                                
                        preparedStatement.setInt(1, Integer.parseInt(range[0]));
                                                                                
                } }).withQuery(“select * from ABC.PAY_PLAN_INFO where plan_key 
= ?“)
                                                                                
        .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
                                                                                
                ObjectMapper mapper = new ObjectMapper();
                                                                                
                ArrayNode arrayNode = mapper.createArrayNode();
                                                                                
                for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); 
i++) {
                                                                                
                        try {
                                                                                
                                ObjectNode objectNode = 
mapper.createObjectNode();
                                                                                
                                
objectNode.put(“column_name”,resultSet.getMetaData().getColumnName(i));
                                                                                
                                objectNode.put(“value”,resultSet.getString(i));
                                                                                
                                arrayNode.add(objectNode);
                                                                                
                        } catch (Exception e) {
                                                                                
                                throw e; }
                                                                                
                }
                                                                                
                return mapper.writeValueAsString(arrayNode); })
                                                                                
        )
                                                                        ; State 
result = p.run().waitUntilFinish();
                                                                        
System.out.println(result);
}{code}
 
 
Kindly suggest how we can resolve it ? Or do we have any reference for same if 
we have kindly share link or snippets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to