Just tryed it out on my pc and I get locally 150000 entries per second. Which 
is good !!! 



Jorge Machado
www.jmachado.me





> On 13 Feb 2018, at 15:57, Jorge Machado <jom...@me.com> wrote:
> 
> Thanks ! will try that out
> 
> 
> 
>> On 13 Feb 2018, at 15:10, Puja Valiyil <puja...@gmail.com> wrote:
>> 
>> Yes.  There is a config parameter on the AccumuloRDFConfiguration that
>> specifies whether or not to flush after every insert.  If you set this to
>> "false", then the ingest times should improve.  If set to true, the flush
>> method on the Accumulo MultitableBatchWriter is called after every each
>> triple is inserted.
>> 
>> 
>> On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado <jom...@me.com> wrote:
>> 
>>> From Accumulo BatchWriters ?
>>> 
>>> Jorge Machado
>>> www.jmachado.me
>>> 
>>> 
>>> 
>>> 
>>> 
>>>> On 13 Feb 2018, at 13:13, Puja Valiyil <puja...@gmail.com> wrote:
>>>> 
>>>> Hey Jorge,
>>>> There is a config value for flushing on insert— make sure that is set to
>>> false.  If it is set to true, The writer to accumulo will flush after every
>>> triple which would slow performance down.
>>>> Hope this helps!
>>>> Thanks,
>>>> Puja
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>>> On Feb 13, 2018, at 3:43 AM, Jorge Machado <jom...@me.com> wrote:
>>>>> 
>>>>> 
>>>>> 
>>>>> Hi Guys,
>>>>> 
>>>>> I just give it a test drive to rya over a spark job on aws with 5
>>> accumulo instances.
>>>>> the performance is really really slow getting only 2000 records per
>>> second.
>>>>> each commit to accumulo takes rought 20ms
>>>>> 
>>>>> Any kind of  trick here or did I miss something ?
>>>>> 
>>>>> Here is my code (I changed the code to run on aws of course):
>>>>>> package template.spark
>>>>>> 
>>>>>> import java.io.File
>>>>>> 
>>>>>> import org.apache.accumulo.core.client.{ClientConfiguration,
>>> Connector, ZooKeeperInstance}
>>>>>> import org.apache.accumulo.core.client.security.tokens.PasswordToken
>>>>>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration,
>>> AccumuloRyaDAO}
>>>>>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore,
>>> RyaSailRepository}
>>>>>> import org.openrdf.model.Resource
>>>>>> import org.openrdf.model.impl.ValueFactoryImpl
>>>>>> import org.openrdf.repository.sail.SailRepositoryConnection
>>>>>> 
>>>>>> 
>>>>>> final case class Person(firstName: String, lastName: String,
>>>>>>                      country: String, age: Int)
>>>>>> 
>>>>>> object Main extends InitSpark {
>>>>>> def main(args: Array[String]) = {
>>>>>>  import spark.implicits._
>>>>>> 
>>>>>>  val version = spark.version
>>>>>>  val dataset = spark.read.parquet("/Users/jorge/Downloads/test-
>>> d4852e42712.gz.parquet")
>>>>>>    dataset.foreachPartition(itr => {
>>>>>>        System.out.println(System.currentTimeMillis()+ " Starting to
>>> get connector")
>>>>>>        val conn = ryaConnection.getConnection
>>>>>>        val a = itr.flatMap(row=>{
>>>>>>            row.schema.map(field=>{
>>>>>>                val any  = row.get(row.schema.fieldIndex(field.name))
>>>>>>                val subject = ryaConnection.vf.createURI("urn:fdc:
>>> gfk.com:19980923:mySubject")
>>>>>>                val predicate = ryaConnection.vf.createURI("urn:fdc:
>>> gfk.com:"+field.name)
>>>>>>                val obj = ryaConnection.vf.
>>> createLiteral(any.toString)
>>>>>>                ryaConnection.vf.createStatement(subject,
>>> predicate,obj)
>>>>>>            })
>>>>>>        })
>>>>>>        System.out.println(System.currentTimeMillis()+ " Start
>>> writting data")
>>>>>>        a.foreach(conn.add(_))
>>>>>>        System.out.println("Finished Partition")
>>>>>>        conn.close()
>>>>>>    })
>>>>>> }
>>>>>> 
>>>>>> 
>>>>>>  object ryaConnection{
>>>>>>      val vf = new ValueFactoryImpl()
>>>>>>      val store = new RdfCloudTripleStore()
>>>>>>      val conf = new AccumuloRdfConfiguration()
>>>>>>      conf.addResource(new File("/Users/jorge/Downloads/
>>> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
>>>>>>      conf.addResource(new File("/Users/jorge/Downloads/
>>> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
>>>>>>      conf.addResource(new File("/Users/jorge/Downloads/
>>> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
>>>>>>      //conf.addResource(new File("/Users/jorge/Downloads/
>>> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
>>>>>>      val dao = new AccumuloRyaDAO()
>>>>>>      val pass = new PasswordToken("accumulo")
>>>>>>      val connector: Connector = new 
>>>>>> ZooKeeperInstance("hdp-accumulo-instance",
>>> "sandbox-hdp.hortonworks.com:2181")
>>>>>>              .getConnector("root",pass)
>>>>>>      System.out.println("got Connector")
>>>>>>      dao.setConnector(connector)
>>>>>>      conf.setTablePrefix("rya_")
>>>>>>      dao.setConf(conf)
>>>>>>      store.setRyaDAO(dao)
>>>>>>      val myRepository = new RyaSailRepository(store)
>>>>>>      myRepository.initialize()
>>>>>> 
>>>>>>      def getConnection: SailRepositoryConnection ={
>>>>>>           myRepository.getConnection
>>>>>>      }
>>>>>> 
>>>>>>  }
>>>>>> 
>>>>>> }
>>>>>> Jorge
>>>>>> 
>>>>>> 
>>>>> <logs.zip>
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to