Just tryed it out on my pc and I get locally 150000 entries per second. Which is good !!!
Jorge Machado www.jmachado.me > On 13 Feb 2018, at 15:57, Jorge Machado <[email protected]> wrote: > > Thanks ! will try that out > > > >> On 13 Feb 2018, at 15:10, Puja Valiyil <[email protected]> wrote: >> >> Yes. There is a config parameter on the AccumuloRDFConfiguration that >> specifies whether or not to flush after every insert. If you set this to >> "false", then the ingest times should improve. If set to true, the flush >> method on the Accumulo MultitableBatchWriter is called after every each >> triple is inserted. >> >> >> On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado <[email protected]> wrote: >> >>> From Accumulo BatchWriters ? >>> >>> Jorge Machado >>> www.jmachado.me >>> >>> >>> >>> >>> >>>> On 13 Feb 2018, at 13:13, Puja Valiyil <[email protected]> wrote: >>>> >>>> Hey Jorge, >>>> There is a config value for flushing on insert— make sure that is set to >>> false. If it is set to true, The writer to accumulo will flush after every >>> triple which would slow performance down. >>>> Hope this helps! >>>> Thanks, >>>> Puja >>>> >>>> Sent from my iPhone >>>> >>>>> On Feb 13, 2018, at 3:43 AM, Jorge Machado <[email protected]> wrote: >>>>> >>>>> >>>>> >>>>> Hi Guys, >>>>> >>>>> I just give it a test drive to rya over a spark job on aws with 5 >>> accumulo instances. >>>>> the performance is really really slow getting only 2000 records per >>> second. >>>>> each commit to accumulo takes rought 20ms >>>>> >>>>> Any kind of trick here or did I miss something ? >>>>> >>>>> Here is my code (I changed the code to run on aws of course): >>>>>> package template.spark >>>>>> >>>>>> import java.io.File >>>>>> >>>>>> import org.apache.accumulo.core.client.{ClientConfiguration, >>> Connector, ZooKeeperInstance} >>>>>> import org.apache.accumulo.core.client.security.tokens.PasswordToken >>>>>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration, >>> AccumuloRyaDAO} >>>>>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore, >>> RyaSailRepository} >>>>>> import org.openrdf.model.Resource >>>>>> import org.openrdf.model.impl.ValueFactoryImpl >>>>>> import org.openrdf.repository.sail.SailRepositoryConnection >>>>>> >>>>>> >>>>>> final case class Person(firstName: String, lastName: String, >>>>>> country: String, age: Int) >>>>>> >>>>>> object Main extends InitSpark { >>>>>> def main(args: Array[String]) = { >>>>>> import spark.implicits._ >>>>>> >>>>>> val version = spark.version >>>>>> val dataset = spark.read.parquet("/Users/jorge/Downloads/test- >>> d4852e42712.gz.parquet") >>>>>> dataset.foreachPartition(itr => { >>>>>> System.out.println(System.currentTimeMillis()+ " Starting to >>> get connector") >>>>>> val conn = ryaConnection.getConnection >>>>>> val a = itr.flatMap(row=>{ >>>>>> row.schema.map(field=>{ >>>>>> val any = row.get(row.schema.fieldIndex(field.name)) >>>>>> val subject = ryaConnection.vf.createURI("urn:fdc: >>> gfk.com:19980923:mySubject") >>>>>> val predicate = ryaConnection.vf.createURI("urn:fdc: >>> gfk.com:"+field.name) >>>>>> val obj = ryaConnection.vf. >>> createLiteral(any.toString) >>>>>> ryaConnection.vf.createStatement(subject, >>> predicate,obj) >>>>>> }) >>>>>> }) >>>>>> System.out.println(System.currentTimeMillis()+ " Start >>> writting data") >>>>>> a.foreach(conn.add(_)) >>>>>> System.out.println("Finished Partition") >>>>>> conn.close() >>>>>> }) >>>>>> } >>>>>> >>>>>> >>>>>> object ryaConnection{ >>>>>> val vf = new ValueFactoryImpl() >>>>>> val store = new RdfCloudTripleStore() >>>>>> val conf = new AccumuloRdfConfiguration() >>>>>> conf.addResource(new File("/Users/jorge/Downloads/ >>> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL) >>>>>> conf.addResource(new File("/Users/jorge/Downloads/ >>> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL) >>>>>> conf.addResource(new File("/Users/jorge/Downloads/ >>> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL) >>>>>> //conf.addResource(new File("/Users/jorge/Downloads/ >>> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL) >>>>>> val dao = new AccumuloRyaDAO() >>>>>> val pass = new PasswordToken("accumulo") >>>>>> val connector: Connector = new >>>>>> ZooKeeperInstance("hdp-accumulo-instance", >>> "sandbox-hdp.hortonworks.com:2181") >>>>>> .getConnector("root",pass) >>>>>> System.out.println("got Connector") >>>>>> dao.setConnector(connector) >>>>>> conf.setTablePrefix("rya_") >>>>>> dao.setConf(conf) >>>>>> store.setRyaDAO(dao) >>>>>> val myRepository = new RyaSailRepository(store) >>>>>> myRepository.initialize() >>>>>> >>>>>> def getConnection: SailRepositoryConnection ={ >>>>>> myRepository.getConnection >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> Jorge >>>>>> >>>>>> >>>>> <logs.zip> >>>>>> >>>>>> >>>>>> >>>>> >>> >>> >
