Thanks ! will try that out
> On 13 Feb 2018, at 15:10, Puja Valiyil <[email protected]> wrote: > > Yes. There is a config parameter on the AccumuloRDFConfiguration that > specifies whether or not to flush after every insert. If you set this to > "false", then the ingest times should improve. If set to true, the flush > method on the Accumulo MultitableBatchWriter is called after every each > triple is inserted. > > > On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado <[email protected]> wrote: > >> From Accumulo BatchWriters ? >> >> Jorge Machado >> www.jmachado.me >> >> >> >> >> >>> On 13 Feb 2018, at 13:13, Puja Valiyil <[email protected]> wrote: >>> >>> Hey Jorge, >>> There is a config value for flushing on insert— make sure that is set to >> false. If it is set to true, The writer to accumulo will flush after every >> triple which would slow performance down. >>> Hope this helps! >>> Thanks, >>> Puja >>> >>> Sent from my iPhone >>> >>>> On Feb 13, 2018, at 3:43 AM, Jorge Machado <[email protected]> wrote: >>>> >>>> >>>> >>>> Hi Guys, >>>> >>>> I just give it a test drive to rya over a spark job on aws with 5 >> accumulo instances. >>>> the performance is really really slow getting only 2000 records per >> second. >>>> each commit to accumulo takes rought 20ms >>>> >>>> Any kind of trick here or did I miss something ? >>>> >>>> Here is my code (I changed the code to run on aws of course): >>>>> package template.spark >>>>> >>>>> import java.io.File >>>>> >>>>> import org.apache.accumulo.core.client.{ClientConfiguration, >> Connector, ZooKeeperInstance} >>>>> import org.apache.accumulo.core.client.security.tokens.PasswordToken >>>>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration, >> AccumuloRyaDAO} >>>>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore, >> RyaSailRepository} >>>>> import org.openrdf.model.Resource >>>>> import org.openrdf.model.impl.ValueFactoryImpl >>>>> import org.openrdf.repository.sail.SailRepositoryConnection >>>>> >>>>> >>>>> final case class Person(firstName: String, lastName: String, >>>>> country: String, age: Int) >>>>> >>>>> object Main extends InitSpark { >>>>> def main(args: Array[String]) = { >>>>> import spark.implicits._ >>>>> >>>>> val version = spark.version >>>>> val dataset = spark.read.parquet("/Users/jorge/Downloads/test- >> d4852e42712.gz.parquet") >>>>> dataset.foreachPartition(itr => { >>>>> System.out.println(System.currentTimeMillis()+ " Starting to >> get connector") >>>>> val conn = ryaConnection.getConnection >>>>> val a = itr.flatMap(row=>{ >>>>> row.schema.map(field=>{ >>>>> val any = row.get(row.schema.fieldIndex(field.name)) >>>>> val subject = ryaConnection.vf.createURI("urn:fdc: >> gfk.com:19980923:mySubject") >>>>> val predicate = ryaConnection.vf.createURI("urn:fdc: >> gfk.com:"+field.name) >>>>> val obj = ryaConnection.vf. >> createLiteral(any.toString) >>>>> ryaConnection.vf.createStatement(subject, >> predicate,obj) >>>>> }) >>>>> }) >>>>> System.out.println(System.currentTimeMillis()+ " Start >> writting data") >>>>> a.foreach(conn.add(_)) >>>>> System.out.println("Finished Partition") >>>>> conn.close() >>>>> }) >>>>> } >>>>> >>>>> >>>>> object ryaConnection{ >>>>> val vf = new ValueFactoryImpl() >>>>> val store = new RdfCloudTripleStore() >>>>> val conf = new AccumuloRdfConfiguration() >>>>> conf.addResource(new File("/Users/jorge/Downloads/ >> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL) >>>>> conf.addResource(new File("/Users/jorge/Downloads/ >> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL) >>>>> conf.addResource(new File("/Users/jorge/Downloads/ >> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL) >>>>> //conf.addResource(new File("/Users/jorge/Downloads/ >> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL) >>>>> val dao = new AccumuloRyaDAO() >>>>> val pass = new PasswordToken("accumulo") >>>>> val connector: Connector = new >>>>> ZooKeeperInstance("hdp-accumulo-instance", >> "sandbox-hdp.hortonworks.com:2181") >>>>> .getConnector("root",pass) >>>>> System.out.println("got Connector") >>>>> dao.setConnector(connector) >>>>> conf.setTablePrefix("rya_") >>>>> dao.setConf(conf) >>>>> store.setRyaDAO(dao) >>>>> val myRepository = new RyaSailRepository(store) >>>>> myRepository.initialize() >>>>> >>>>> def getConnection: SailRepositoryConnection ={ >>>>> myRepository.getConnection >>>>> } >>>>> >>>>> } >>>>> >>>>> } >>>>> Jorge >>>>> >>>>> >>>> <logs.zip> >>>>> >>>>> >>>>> >>>> >> >>
