From Accumulo BatchWriters ? 

Jorge Machado

> On 13 Feb 2018, at 13:13, Puja Valiyil <> wrote:
> Hey Jorge,
> There is a config value for flushing on insert— make sure that is set to 
> false.  If it is set to true, The writer to accumulo will flush after every 
> triple which would slow performance down.
> Hope this helps!
> Thanks,
> Puja
> Sent from my iPhone
>> On Feb 13, 2018, at 3:43 AM, Jorge Machado <> wrote:
>> Hi Guys, 
>> I just give it a test drive to rya over a spark job on aws with 5 accumulo 
>> instances. 
>> the performance is really really slow getting only 2000 records per second.
>> each commit to accumulo takes rought 20ms
>> Any kind of  trick here or did I miss something ? 
>> Here is my code (I changed the code to run on aws of course): 
>>> package template.spark
>>> import
>>> import org.apache.accumulo.core.client.{ClientConfiguration, Connector, 
>>> ZooKeeperInstance}
>>> import
>>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration, AccumuloRyaDAO}
>>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore, 
>>> RyaSailRepository}
>>> import org.openrdf.model.Resource
>>> import org.openrdf.model.impl.ValueFactoryImpl
>>> import org.openrdf.repository.sail.SailRepositoryConnection
>>> final case class Person(firstName: String, lastName: String,
>>>                        country: String, age: Int)
>>> object Main extends InitSpark {
>>>  def main(args: Array[String]) = {
>>>    import spark.implicits._
>>>    val version = spark.version
>>>    val dataset = 
>>>      dataset.foreachPartition(itr => {
>>>          System.out.println(System.currentTimeMillis()+ " Starting to get 
>>> connector")
>>>          val conn = ryaConnection.getConnection
>>>          val a = itr.flatMap(row=>{
>>>    >{
>>>                  val any  = row.get(row.schema.fieldIndex(
>>>                  val subject = 
>>> ryaConnection.vf.createURI("")
>>>                  val predicate = 
>>> ryaConnection.vf.createURI(""
>>>                  val obj = ryaConnection.vf.createLiteral(any.toString)
>>>                  ryaConnection.vf.createStatement(subject,predicate,obj)
>>>              })
>>>          })
>>>          System.out.println(System.currentTimeMillis()+ " Start writting 
>>> data")
>>>          a.foreach(conn.add(_))
>>>          System.out.println("Finished Partition")
>>>          conn.close()
>>>      })
>>>  }
>>>    object ryaConnection{
>>>        val vf = new ValueFactoryImpl()
>>>        val store = new RdfCloudTripleStore()
>>>        val conf = new AccumuloRdfConfiguration()
>>>        conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
>>>        conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
>>>        conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
>>>        //conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
>>>        val dao = new AccumuloRyaDAO()
>>>        val pass = new PasswordToken("accumulo")
>>>        val connector: Connector = new 
>>> ZooKeeperInstance("hdp-accumulo-instance", 
>>> "")
>>>                .getConnector("root",pass)
>>>        System.out.println("got Connector")
>>>        dao.setConnector(connector)
>>>        conf.setTablePrefix("rya_")
>>>        dao.setConf(conf)
>>>        store.setRyaDAO(dao)
>>>        val myRepository = new RyaSailRepository(store)
>>>        myRepository.initialize()
>>>        def getConnection: SailRepositoryConnection ={
>>>             myRepository.getConnection
>>>        }
>>>    }
>>> }
>>> Jorge
>> <>

Reply via email to