Hey Jorge,
There is a config value for flushing on insert— make sure that is set to false. 
 If it is set to true, The writer to accumulo will flush after every triple 
which would slow performance down.
Hope this helps!
Thanks,
Puja

Sent from my iPhone

> On Feb 13, 2018, at 3:43 AM, Jorge Machado <jom...@me.com> wrote:
> 
> 
> 
> Hi Guys, 
> 
> I just give it a test drive to rya over a spark job on aws with 5 accumulo 
> instances. 
> the performance is really really slow getting only 2000 records per second.
> each commit to accumulo takes rought 20ms
> 
> Any kind of  trick here or did I miss something ? 
> 
> Here is my code (I changed the code to run on aws of course): 
>> package template.spark
>> 
>> import java.io.File
>> 
>> import org.apache.accumulo.core.client.{ClientConfiguration, Connector, 
>> ZooKeeperInstance}
>> import org.apache.accumulo.core.client.security.tokens.PasswordToken
>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration, AccumuloRyaDAO}
>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore, RyaSailRepository}
>> import org.openrdf.model.Resource
>> import org.openrdf.model.impl.ValueFactoryImpl
>> import org.openrdf.repository.sail.SailRepositoryConnection
>> 
>> 
>> final case class Person(firstName: String, lastName: String,
>>                         country: String, age: Int)
>> 
>> object Main extends InitSpark {
>>   def main(args: Array[String]) = {
>>     import spark.implicits._
>> 
>>     val version = spark.version
>>     val dataset = 
>> spark.read.parquet("/Users/jorge/Downloads/test-d4852e42712.gz.parquet")
>>       dataset.foreachPartition(itr => {
>>           System.out.println(System.currentTimeMillis()+ " Starting to get 
>> connector")
>>           val conn = ryaConnection.getConnection
>>           val a = itr.flatMap(row=>{
>>               row.schema.map(field=>{
>>                   val any  = row.get(row.schema.fieldIndex(field.name))
>>                   val subject = 
>> ryaConnection.vf.createURI("urn:fdc:gfk.com:19980923:mySubject")
>>                   val predicate = 
>> ryaConnection.vf.createURI("urn:fdc:gfk.com:"+field.name)
>>                   val obj = ryaConnection.vf.createLiteral(any.toString)
>>                   ryaConnection.vf.createStatement(subject,predicate,obj)
>>               })
>>           })
>>           System.out.println(System.currentTimeMillis()+ " Start writting 
>> data")
>>           a.foreach(conn.add(_))
>>           System.out.println("Finished Partition")
>>           conn.close()
>>       })
>>   }
>> 
>> 
>>     object ryaConnection{
>>         val vf = new ValueFactoryImpl()
>>         val store = new RdfCloudTripleStore()
>>         val conf = new AccumuloRdfConfiguration()
>>         conf.addResource(new 
>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
>>         conf.addResource(new 
>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
>>         conf.addResource(new 
>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
>>         //conf.addResource(new 
>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
>>         val dao = new AccumuloRyaDAO()
>>         val pass = new PasswordToken("accumulo")
>>         val connector: Connector = new 
>> ZooKeeperInstance("hdp-accumulo-instance", 
>> "sandbox-hdp.hortonworks.com:2181")
>>                 .getConnector("root",pass)
>>         System.out.println("got Connector")
>>         dao.setConnector(connector)
>>         conf.setTablePrefix("rya_")
>>         dao.setConf(conf)
>>         store.setRyaDAO(dao)
>>         val myRepository = new RyaSailRepository(store)
>>         myRepository.initialize()
>> 
>>         def getConnection: SailRepositoryConnection ={
>>              myRepository.getConnection
>>         }
>> 
>>     }
>> 
>> }
>> Jorge
>> 
>> 
> <logs.zip>
>> 
>> 
>> 
> 

Reply via email to