Thanks ! will try that out


> On 13 Feb 2018, at 15:10, Puja Valiyil <puja...@gmail.com> wrote:
> 
> Yes.  There is a config parameter on the AccumuloRDFConfiguration that
> specifies whether or not to flush after every insert.  If you set this to
> "false", then the ingest times should improve.  If set to true, the flush
> method on the Accumulo MultitableBatchWriter is called after every each
> triple is inserted.
> 
> 
> On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado <jom...@me.com> wrote:
> 
>> From Accumulo BatchWriters ?
>> 
>> Jorge Machado
>> www.jmachado.me
>> 
>> 
>> 
>> 
>> 
>>> On 13 Feb 2018, at 13:13, Puja Valiyil <puja...@gmail.com> wrote:
>>> 
>>> Hey Jorge,
>>> There is a config value for flushing on insert— make sure that is set to
>> false.  If it is set to true, The writer to accumulo will flush after every
>> triple which would slow performance down.
>>> Hope this helps!
>>> Thanks,
>>> Puja
>>> 
>>> Sent from my iPhone
>>> 
>>>> On Feb 13, 2018, at 3:43 AM, Jorge Machado <jom...@me.com> wrote:
>>>> 
>>>> 
>>>> 
>>>> Hi Guys,
>>>> 
>>>> I just give it a test drive to rya over a spark job on aws with 5
>> accumulo instances.
>>>> the performance is really really slow getting only 2000 records per
>> second.
>>>> each commit to accumulo takes rought 20ms
>>>> 
>>>> Any kind of  trick here or did I miss something ?
>>>> 
>>>> Here is my code (I changed the code to run on aws of course):
>>>>> package template.spark
>>>>> 
>>>>> import java.io.File
>>>>> 
>>>>> import org.apache.accumulo.core.client.{ClientConfiguration,
>> Connector, ZooKeeperInstance}
>>>>> import org.apache.accumulo.core.client.security.tokens.PasswordToken
>>>>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration,
>> AccumuloRyaDAO}
>>>>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore,
>> RyaSailRepository}
>>>>> import org.openrdf.model.Resource
>>>>> import org.openrdf.model.impl.ValueFactoryImpl
>>>>> import org.openrdf.repository.sail.SailRepositoryConnection
>>>>> 
>>>>> 
>>>>> final case class Person(firstName: String, lastName: String,
>>>>>                       country: String, age: Int)
>>>>> 
>>>>> object Main extends InitSpark {
>>>>> def main(args: Array[String]) = {
>>>>>   import spark.implicits._
>>>>> 
>>>>>   val version = spark.version
>>>>>   val dataset = spark.read.parquet("/Users/jorge/Downloads/test-
>> d4852e42712.gz.parquet")
>>>>>     dataset.foreachPartition(itr => {
>>>>>         System.out.println(System.currentTimeMillis()+ " Starting to
>> get connector")
>>>>>         val conn = ryaConnection.getConnection
>>>>>         val a = itr.flatMap(row=>{
>>>>>             row.schema.map(field=>{
>>>>>                 val any  = row.get(row.schema.fieldIndex(field.name))
>>>>>                 val subject = ryaConnection.vf.createURI("urn:fdc:
>> gfk.com:19980923:mySubject")
>>>>>                 val predicate = ryaConnection.vf.createURI("urn:fdc:
>> gfk.com:"+field.name)
>>>>>                 val obj = ryaConnection.vf.
>> createLiteral(any.toString)
>>>>>                 ryaConnection.vf.createStatement(subject,
>> predicate,obj)
>>>>>             })
>>>>>         })
>>>>>         System.out.println(System.currentTimeMillis()+ " Start
>> writting data")
>>>>>         a.foreach(conn.add(_))
>>>>>         System.out.println("Finished Partition")
>>>>>         conn.close()
>>>>>     })
>>>>> }
>>>>> 
>>>>> 
>>>>>   object ryaConnection{
>>>>>       val vf = new ValueFactoryImpl()
>>>>>       val store = new RdfCloudTripleStore()
>>>>>       val conf = new AccumuloRdfConfiguration()
>>>>>       conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
>>>>>       conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
>>>>>       conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
>>>>>       //conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
>>>>>       val dao = new AccumuloRyaDAO()
>>>>>       val pass = new PasswordToken("accumulo")
>>>>>       val connector: Connector = new 
>>>>> ZooKeeperInstance("hdp-accumulo-instance",
>> "sandbox-hdp.hortonworks.com:2181")
>>>>>               .getConnector("root",pass)
>>>>>       System.out.println("got Connector")
>>>>>       dao.setConnector(connector)
>>>>>       conf.setTablePrefix("rya_")
>>>>>       dao.setConf(conf)
>>>>>       store.setRyaDAO(dao)
>>>>>       val myRepository = new RyaSailRepository(store)
>>>>>       myRepository.initialize()
>>>>> 
>>>>>       def getConnection: SailRepositoryConnection ={
>>>>>            myRepository.getConnection
>>>>>       }
>>>>> 
>>>>>   }
>>>>> 
>>>>> }
>>>>> Jorge
>>>>> 
>>>>> 
>>>> <logs.zip>
>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 
>> 

Reply via email to