[ 
https://issues.apache.org/jira/browse/BEAM-12005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Khandelwal updated BEAM-12005:
-------------------------------------
    Description: 
Hi Team,

We are getting below error :
 org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.beam.sdk.values.KV
  
 Our target is to load file into database. We tried following approach: 
{code:java}
@SuppressWarnings("unchecked")
        public static void main(String[] args) {
                PCSI02AOptions options = 
PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
                Pipeline p = Pipeline.create(options);


                PCollection data1 = p.apply("Reading Text", 
TextIO.read().from(options.getInputFile()))
                                        .apply(ParDo.of(new GetPlanID()))
                                        .apply("Format Result", 
                                                        
MapElements.into(TypeDescriptors.strings())
                                                        .via((KV<String, 
Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));


                data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, 
String>readAll()
                                
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                                                
.create("com.mysql.cj.jdbc.Driver", 
"jdbc:mysql://localhost:3306/ABC").withUsername("abc")
                                                .withPassword("abc123"))
                                .withCoder(StringUtf8Coder.of())
                                .withParameterSetter(new 
JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
                                        @Override
                                        public void setParameters(KV<String, 
Iterable<Integer>> element,
                                                        PreparedStatement 
preparedStatement) throws Exception {
                                                String[] range = 
element.getKey().split(",");
                                                preparedStatement.setInt(1, 
Integer.parseInt(range[0]));
                                        }


                                }).withQuery("select * from ABC.PAY_PLAN_INFO 
where plan_key = ?")
                                .withRowMapper((JdbcIO.RowMapper<String>) 
resultSet -> {
                                        ObjectMapper mapper = new 
ObjectMapper();
                                        ArrayNode arrayNode = 
mapper.createArrayNode();
                                        for (int i = 1; i <= 
resultSet.getMetaData().getColumnCount(); i++) {
                                                try {
                                                        ObjectNode objectNode = 
mapper.createObjectNode();
                                                        
objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
                                                        
objectNode.put("value",resultSet.getString(i));
                                                        
arrayNode.add(objectNode);
                                                } catch (Exception e) {
                                                        throw e;


                                                }
                                        }
                                        return 
mapper.writeValueAsString(arrayNode);


                                })
                )
                ;


                State result = p.run().waitUntilFinish();
                System.out.println(result);
        }


private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
                @ProcessElement
                public void processElement(ProcessContext c)
                {
                        String[] data = c.element().split(",");
                        Integer plankey = Integer.parseInt(data[0]);
                        String planid = data[1];
                        c.output(KV.of(planid, plankey));
                }
        }{code}
 
  
 Kindly suggest how we can resolve it ? Or do we have any reference for same if 
we have kindly share link or snippets.

  was:
Hi Team,

We are getting below error :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.beam.sdk.values.KV
 
Our target is to load file into database. We tried following approach: 
{code:java}
public static void main(String[] args) {
        PCSI02AOptions options = 
PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
        Pipeline p = Pipeline.create(options); PCollection data1 = 
p.apply(“Reading Text”, TextIO.read().from(options.getInputFile()))
                        .apply(ParDo.of(new GetRatePlanID()))
                        .apply(“Format Result”, 
                                        
MapElements.into(TypeDescriptors.strings())
                                        .via((KV<String, Integer> ABC) -> 
ABC.getKey() + “,” + +ABC.getValue())); data1.apply(JdbcIO.<KV<String, 
Iterable<Integer>>, String>readAll()
                                                        
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                                                                        
.create("com.mysql.cj.jdbc.Driver", 
"jdbc:mysql://localhost:3306/ABC").withUsername("abc")
                                                                        
.withPassword(“abc123"))
                                                                                
        .withCoder(StringUtf8Coder.of())
                                                                                
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
Iterable<Integer>>>() {
                                                                                
                @Override
                                                                                
                public void setParameters(KV<String, Iterable<Integer>> element,
                                                                                
                                PreparedStatement preparedStatement) throws 
Exception {
                                                                                
                        String[] range = element.getKey().split(“,”);
                                                                                
                        preparedStatement.setInt(1, Integer.parseInt(range[0]));
                                                                                
                } }).withQuery(“select * from ABC.PAY_PLAN_INFO where plan_key 
= ?“)
                                                                                
        .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
                                                                                
                ObjectMapper mapper = new ObjectMapper();
                                                                                
                ArrayNode arrayNode = mapper.createArrayNode();
                                                                                
                for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); 
i++) {
                                                                                
                        try {
                                                                                
                                ObjectNode objectNode = 
mapper.createObjectNode();
                                                                                
                                
objectNode.put(“column_name”,resultSet.getMetaData().getColumnName(i));
                                                                                
                                objectNode.put(“value”,resultSet.getString(i));
                                                                                
                                arrayNode.add(objectNode);
                                                                                
                        } catch (Exception e) {
                                                                                
                                throw e; }
                                                                                
                }
                                                                                
                return mapper.writeValueAsString(arrayNode); })
                                                                                
        )
                                                                        ; State 
result = p.run().waitUntilFinish();
                                                                        
System.out.println(result);
}{code}
 
 
Kindly suggest how we can resolve it ? Or do we have any reference for same if 
we have kindly share link or snippets.


> getting issue to load file into database (java.lang.ClassCastException: 
> java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12005
>                 URL: https://issues.apache.org/jira/browse/BEAM-12005
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-community, beam-model, io-java-files, io-java-gcp, 
> io-java-jdbc, runner-dataflow
>    Affects Versions: 2.28.0
>            Reporter: Gaurav Khandelwal
>            Assignee: Aizhamal Nurmamat kyzy
>            Priority: P2
>
> Hi Team,
> We are getting below error :
>  org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.beam.sdk.values.KV
>   
>  Our target is to load file into database. We tried following approach: 
> {code:java}
> @SuppressWarnings("unchecked")
>       public static void main(String[] args) {
>               PCSI02AOptions options = 
> PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
>               Pipeline p = Pipeline.create(options);
>               PCollection data1 = p.apply("Reading Text", 
> TextIO.read().from(options.getInputFile()))
>                                       .apply(ParDo.of(new GetPlanID()))
>                                       .apply("Format Result", 
>                                                       
> MapElements.into(TypeDescriptors.strings())
>                                                       .via((KV<String, 
> Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
>               data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, 
> String>readAll()
>                               
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>                                               
> .create("com.mysql.cj.jdbc.Driver", 
> "jdbc:mysql://localhost:3306/ABC").withUsername("abc")
>                                               .withPassword("abc123"))
>                               .withCoder(StringUtf8Coder.of())
>                               .withParameterSetter(new 
> JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
>                                       @Override
>                                       public void setParameters(KV<String, 
> Iterable<Integer>> element,
>                                                       PreparedStatement 
> preparedStatement) throws Exception {
>                                               String[] range = 
> element.getKey().split(",");
>                                               preparedStatement.setInt(1, 
> Integer.parseInt(range[0]));
>                                       }
>                               }).withQuery("select * from ABC.PAY_PLAN_INFO 
> where plan_key = ?")
>                               .withRowMapper((JdbcIO.RowMapper<String>) 
> resultSet -> {
>                                       ObjectMapper mapper = new 
> ObjectMapper();
>                                       ArrayNode arrayNode = 
> mapper.createArrayNode();
>                                       for (int i = 1; i <= 
> resultSet.getMetaData().getColumnCount(); i++) {
>                                               try {
>                                                       ObjectNode objectNode = 
> mapper.createObjectNode();
>                                                       
> objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
>                                                       
> objectNode.put("value",resultSet.getString(i));
>                                                       
> arrayNode.add(objectNode);
>                                               } catch (Exception e) {
>                                                       throw e;
>                                               }
>                                       }
>                                       return 
> mapper.writeValueAsString(arrayNode);
>                               })
>               )
>               ;
>               State result = p.run().waitUntilFinish();
>               System.out.println(result);
>       }
> private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
>               @ProcessElement
>               public void processElement(ProcessContext c)
>               {
>                       String[] data = c.element().split(",");
>                       Integer plankey = Integer.parseInt(data[0]);
>                       String planid = data[1];
>                       c.output(KV.of(planid, plankey));
>               }
>       }{code}
>  
>   
>  Kindly suggest how we can resolve it ? Or do we have any reference for same 
> if we have kindly share link or snippets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to