From Accumulo BatchWriters ? 

Jorge Machado
www.jmachado.me





> On 13 Feb 2018, at 13:13, Puja Valiyil <puja...@gmail.com> wrote:
> 
> Hey Jorge,
> There is a config value for flushing on insert— make sure that is set to 
> false.  If it is set to true, The writer to accumulo will flush after every 
> triple which would slow performance down.
> Hope this helps!
> Thanks,
> Puja
> 
> Sent from my iPhone
> 
>> On Feb 13, 2018, at 3:43 AM, Jorge Machado <jom...@me.com> wrote:
>> 
>> 
>> 
>> Hi Guys, 
>> 
>> I just give it a test drive to rya over a spark job on aws with 5 accumulo 
>> instances. 
>> the performance is really really slow getting only 2000 records per second.
>> each commit to accumulo takes rought 20ms
>> 
>> Any kind of  trick here or did I miss something ? 
>> 
>> Here is my code (I changed the code to run on aws of course): 
>>> package template.spark
>>> 
>>> import java.io.File
>>> 
>>> import org.apache.accumulo.core.client.{ClientConfiguration, Connector, 
>>> ZooKeeperInstance}
>>> import org.apache.accumulo.core.client.security.tokens.PasswordToken
>>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration, AccumuloRyaDAO}
>>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore, 
>>> RyaSailRepository}
>>> import org.openrdf.model.Resource
>>> import org.openrdf.model.impl.ValueFactoryImpl
>>> import org.openrdf.repository.sail.SailRepositoryConnection
>>> 
>>> 
>>> final case class Person(firstName: String, lastName: String,
>>>                        country: String, age: Int)
>>> 
>>> object Main extends InitSpark {
>>>  def main(args: Array[String]) = {
>>>    import spark.implicits._
>>> 
>>>    val version = spark.version
>>>    val dataset = 
>>> spark.read.parquet("/Users/jorge/Downloads/test-d4852e42712.gz.parquet")
>>>      dataset.foreachPartition(itr => {
>>>          System.out.println(System.currentTimeMillis()+ " Starting to get 
>>> connector")
>>>          val conn = ryaConnection.getConnection
>>>          val a = itr.flatMap(row=>{
>>>              row.schema.map(field=>{
>>>                  val any  = row.get(row.schema.fieldIndex(field.name))
>>>                  val subject = 
>>> ryaConnection.vf.createURI("urn:fdc:gfk.com:19980923:mySubject")
>>>                  val predicate = 
>>> ryaConnection.vf.createURI("urn:fdc:gfk.com:"+field.name)
>>>                  val obj = ryaConnection.vf.createLiteral(any.toString)
>>>                  ryaConnection.vf.createStatement(subject,predicate,obj)
>>>              })
>>>          })
>>>          System.out.println(System.currentTimeMillis()+ " Start writting 
>>> data")
>>>          a.foreach(conn.add(_))
>>>          System.out.println("Finished Partition")
>>>          conn.close()
>>>      })
>>>  }
>>> 
>>> 
>>>    object ryaConnection{
>>>        val vf = new ValueFactoryImpl()
>>>        val store = new RdfCloudTripleStore()
>>>        val conf = new AccumuloRdfConfiguration()
>>>        conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
>>>        conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
>>>        conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
>>>        //conf.addResource(new 
>>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
>>>        val dao = new AccumuloRyaDAO()
>>>        val pass = new PasswordToken("accumulo")
>>>        val connector: Connector = new 
>>> ZooKeeperInstance("hdp-accumulo-instance", 
>>> "sandbox-hdp.hortonworks.com:2181")
>>>                .getConnector("root",pass)
>>>        System.out.println("got Connector")
>>>        dao.setConnector(connector)
>>>        conf.setTablePrefix("rya_")
>>>        dao.setConf(conf)
>>>        store.setRyaDAO(dao)
>>>        val myRepository = new RyaSailRepository(store)
>>>        myRepository.initialize()
>>> 
>>>        def getConnection: SailRepositoryConnection ={
>>>             myRepository.getConnection
>>>        }
>>> 
>>>    }
>>> 
>>> }
>>> Jorge
>>> 
>>> 
>> <logs.zip>
>>> 
>>> 
>>> 
>> 

Reply via email to