[ 
https://issues.apache.org/jira/browse/PHOENIX-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rahul updated PHOENIX-5727:
---------------------------
    Description: 
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table 
using Phoenix JDBC thick client with commit size of 500 what i have observed is 
the job silently fails to do upserts without throwing any errors this happens 
intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys 
than inserts.

is this is known issue with phoenix?

 

Sample Code

A and B are composite keys with commit size of 500

 

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
                        val con_startTimeMillis = System.currentTimeMillis()    
                   

val con = DriverManager.getConnection("jdbc:phoenix:localhost")                 
      

println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
con_startTimeMillis).toDouble / 1000 + " secs")                     

  con.setAutoCommit(false);                                                

for loop

{                                var a = rec.getAs("A").toString                
             

  var b = rec.getAs("B").toString                               

var c = rec.getAs("C").toString                                v

ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString              
       

          var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString   
                            

var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString             
                  

var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString             
                  

var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString            
                    \                             var upsert_stmt = "upsert 
into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" 
+ d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                     
          

println(">>>>upsert statement formed::" + upsert_stmt)                          
      

var stmt = con.prepareStatement(upsert_stmt)                               

stmt.executeUpdate()                           

    bs=bs+1;                               

if (bs % commitSize == 0)

\\{                                    con.commit()                             
   }

                        }

 

                      con.commit()

                        con.close()

 

  was:
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table 
using Phoenix JDBC thick client with commit size of 500 what i have observed is 
the job silently fails to do upserts without throwing any errors this happens 
intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys 
than inserts.

is this is known issue with phoenix?

 

Sample Code

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
                        val con_startTimeMillis = System.currentTimeMillis()    
                    val con = 
DriverManager.getConnection("jdbc:phoenix:localhost")                        
println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
con_startTimeMillis).toDouble / 1000 + " secs")                        
con.setAutoCommit(false);                                                for 
loop {                                var a = rec.getAs("A").toString           
                     var b = rec.getAs("B").toString                            
    var c = rec.getAs("C").toString                                var d = if 
(rec.getAs("D") == null) "" else rec.getAs("D").toString                        
        var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString     
                           var f = if (rec.getAs("F") == null) "" else 
rec.getAs("F").toString                                var g = if 
(rec.getAs("G") == null) "" else rec.getAs("G").toString                        
        var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString    
                            var i = if (rec.getAs("I") == null) "" else 
rec.getAs("I").toString 
                                 var upsert_stmt = "upsert into " + 
phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" + d + 
"','" + e  + "','" + f + "','" + g + "','" + h + "')"                           
     println(">>>>upsert statement formed::" + upsert_stmt)                     
            var stmt = con.prepareStatement(upsert_stmt)                        
        stmt.executeUpdate()                                bs=bs+1;            
                    if (bs % commitSize == 0) \{                                
    con.commit()                                }                         }

 

                      con.commit()

                        con.close()

 


> Intermittent Upserts with Kafka and Spark Streaming
> ---------------------------------------------------
>
>                 Key: PHOENIX-5727
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-5727
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 4.14.0
>            Reporter: Rahul
>            Priority: Major
>
> Hi,
> I have a spark job which reads from kafka stream and writes to a phoenix 
> table using Phoenix JDBC thick client with commit size of 500 what i have 
> observed is the job silently fails to do upserts without throwing any errors 
> this happens intermittently the frequency of data what i get is around 1000 
> rows/sec.
> And my Input data set is such that we will have more updates on the row keys 
> than inserts.
> is this is known issue with phoenix?
>  
> Sample Code
> A and B are composite keys with commit size of 500
>  
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
>                         val con_startTimeMillis = System.currentTimeMillis()  
>                      
> val con = DriverManager.getConnection("jdbc:phoenix:localhost")               
>         
> println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
> con_startTimeMillis).toDouble / 1000 + " secs")                     
>   con.setAutoCommit(false);                                                
> for loop
> {                                var a = rec.getAs("A").toString              
>                
>   var b = rec.getAs("B").toString                               
> var c = rec.getAs("C").toString                                v
> ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString            
>          
>           var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString 
>                               
> var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString           
>                     
> var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString           
>                     
> var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString          
>                       \                             var upsert_stmt = "upsert 
> into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + 
> "','" + d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"             
>                   
> println(">>>>upsert statement formed::" + upsert_stmt)                        
>         
> var stmt = con.prepareStatement(upsert_stmt)                               
> stmt.executeUpdate()                           
>     bs=bs+1;                               
> if (bs % commitSize == 0)
> \\{                                    con.commit()                           
>      }
>                         }
>  
>                       con.commit()
>                         con.close()
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to