From Accumulo BatchWriters ? Jorge Machado www.jmachado.me
> On 13 Feb 2018, at 13:13, Puja Valiyil <puja...@gmail.com> wrote: > > Hey Jorge, > There is a config value for flushing on insert— make sure that is set to > false. If it is set to true, The writer to accumulo will flush after every > triple which would slow performance down. > Hope this helps! > Thanks, > Puja > > Sent from my iPhone > >> On Feb 13, 2018, at 3:43 AM, Jorge Machado <jom...@me.com> wrote: >> >> >> >> Hi Guys, >> >> I just give it a test drive to rya over a spark job on aws with 5 accumulo >> instances. >> the performance is really really slow getting only 2000 records per second. >> each commit to accumulo takes rought 20ms >> >> Any kind of trick here or did I miss something ? >> >> Here is my code (I changed the code to run on aws of course): >>> package template.spark >>> >>> import java.io.File >>> >>> import org.apache.accumulo.core.client.{ClientConfiguration, Connector, >>> ZooKeeperInstance} >>> import org.apache.accumulo.core.client.security.tokens.PasswordToken >>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration, AccumuloRyaDAO} >>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore, >>> RyaSailRepository} >>> import org.openrdf.model.Resource >>> import org.openrdf.model.impl.ValueFactoryImpl >>> import org.openrdf.repository.sail.SailRepositoryConnection >>> >>> >>> final case class Person(firstName: String, lastName: String, >>> country: String, age: Int) >>> >>> object Main extends InitSpark { >>> def main(args: Array[String]) = { >>> import spark.implicits._ >>> >>> val version = spark.version >>> val dataset = >>> spark.read.parquet("/Users/jorge/Downloads/test-d4852e42712.gz.parquet") >>> dataset.foreachPartition(itr => { >>> System.out.println(System.currentTimeMillis()+ " Starting to get >>> connector") >>> val conn = ryaConnection.getConnection >>> val a = itr.flatMap(row=>{ >>> row.schema.map(field=>{ >>> val any = row.get(row.schema.fieldIndex(field.name)) >>> val subject = >>> ryaConnection.vf.createURI("urn:fdc:gfk.com:19980923:mySubject") >>> val predicate = >>> ryaConnection.vf.createURI("urn:fdc:gfk.com:"+field.name) >>> val obj = ryaConnection.vf.createLiteral(any.toString) >>> ryaConnection.vf.createStatement(subject,predicate,obj) >>> }) >>> }) >>> System.out.println(System.currentTimeMillis()+ " Start writting >>> data") >>> a.foreach(conn.add(_)) >>> System.out.println("Finished Partition") >>> conn.close() >>> }) >>> } >>> >>> >>> object ryaConnection{ >>> val vf = new ValueFactoryImpl() >>> val store = new RdfCloudTripleStore() >>> val conf = new AccumuloRdfConfiguration() >>> conf.addResource(new >>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL) >>> conf.addResource(new >>> File("/Users/jorge/Downloads/hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL) >>> conf.addResource(new >>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL) >>> //conf.addResource(new >>> File("/Users/jorge/Downloads/hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL) >>> val dao = new AccumuloRyaDAO() >>> val pass = new PasswordToken("accumulo") >>> val connector: Connector = new >>> ZooKeeperInstance("hdp-accumulo-instance", >>> "sandbox-hdp.hortonworks.com:2181") >>> .getConnector("root",pass) >>> System.out.println("got Connector") >>> dao.setConnector(connector) >>> conf.setTablePrefix("rya_") >>> dao.setConf(conf) >>> store.setRyaDAO(dao) >>> val myRepository = new RyaSailRepository(store) >>> myRepository.initialize() >>> >>> def getConnection: SailRepositoryConnection ={ >>> myRepository.getConnection >>> } >>> >>> } >>> >>> } >>> Jorge >>> >>> >> <logs.zip> >>> >>> >>> >>