Yes.  There is a config parameter on the AccumuloRDFConfiguration that
specifies whether or not to flush after every insert.  If you set this to
"false", then the ingest times should improve.  If set to true, the flush
method on the Accumulo MultitableBatchWriter is called after every each
triple is inserted.


On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado <jom...@me.com> wrote:

> From Accumulo BatchWriters ?
>
> Jorge Machado
> www.jmachado.me
>
>
>
>
>
> > On 13 Feb 2018, at 13:13, Puja Valiyil <puja...@gmail.com> wrote:
> >
> > Hey Jorge,
> > There is a config value for flushing on insert— make sure that is set to
> false.  If it is set to true, The writer to accumulo will flush after every
> triple which would slow performance down.
> > Hope this helps!
> > Thanks,
> > Puja
> >
> > Sent from my iPhone
> >
> >> On Feb 13, 2018, at 3:43 AM, Jorge Machado <jom...@me.com> wrote:
> >>
> >>
> >>
> >> Hi Guys,
> >>
> >> I just give it a test drive to rya over a spark job on aws with 5
> accumulo instances.
> >> the performance is really really slow getting only 2000 records per
> second.
> >> each commit to accumulo takes rought 20ms
> >>
> >> Any kind of  trick here or did I miss something ?
> >>
> >> Here is my code (I changed the code to run on aws of course):
> >>> package template.spark
> >>>
> >>> import java.io.File
> >>>
> >>> import org.apache.accumulo.core.client.{ClientConfiguration,
> Connector, ZooKeeperInstance}
> >>> import org.apache.accumulo.core.client.security.tokens.PasswordToken
> >>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration,
> AccumuloRyaDAO}
> >>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore,
> RyaSailRepository}
> >>> import org.openrdf.model.Resource
> >>> import org.openrdf.model.impl.ValueFactoryImpl
> >>> import org.openrdf.repository.sail.SailRepositoryConnection
> >>>
> >>>
> >>> final case class Person(firstName: String, lastName: String,
> >>>                        country: String, age: Int)
> >>>
> >>> object Main extends InitSpark {
> >>>  def main(args: Array[String]) = {
> >>>    import spark.implicits._
> >>>
> >>>    val version = spark.version
> >>>    val dataset = spark.read.parquet("/Users/jorge/Downloads/test-
> d4852e42712.gz.parquet")
> >>>      dataset.foreachPartition(itr => {
> >>>          System.out.println(System.currentTimeMillis()+ " Starting to
> get connector")
> >>>          val conn = ryaConnection.getConnection
> >>>          val a = itr.flatMap(row=>{
> >>>              row.schema.map(field=>{
> >>>                  val any  = row.get(row.schema.fieldIndex(field.name))
> >>>                  val subject = ryaConnection.vf.createURI("urn:fdc:
> gfk.com:19980923:mySubject")
> >>>                  val predicate = ryaConnection.vf.createURI("urn:fdc:
> gfk.com:"+field.name)
> >>>                  val obj = ryaConnection.vf.
> createLiteral(any.toString)
> >>>                  ryaConnection.vf.createStatement(subject,
> predicate,obj)
> >>>              })
> >>>          })
> >>>          System.out.println(System.currentTimeMillis()+ " Start
> writting data")
> >>>          a.foreach(conn.add(_))
> >>>          System.out.println("Finished Partition")
> >>>          conn.close()
> >>>      })
> >>>  }
> >>>
> >>>
> >>>    object ryaConnection{
> >>>        val vf = new ValueFactoryImpl()
> >>>        val store = new RdfCloudTripleStore()
> >>>        val conf = new AccumuloRdfConfiguration()
> >>>        conf.addResource(new File("/Users/jorge/Downloads/
> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
> >>>        conf.addResource(new File("/Users/jorge/Downloads/
> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
> >>>        conf.addResource(new File("/Users/jorge/Downloads/
> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
> >>>        //conf.addResource(new File("/Users/jorge/Downloads/
> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
> >>>        val dao = new AccumuloRyaDAO()
> >>>        val pass = new PasswordToken("accumulo")
> >>>        val connector: Connector = new 
> >>> ZooKeeperInstance("hdp-accumulo-instance",
> "sandbox-hdp.hortonworks.com:2181")
> >>>                .getConnector("root",pass)
> >>>        System.out.println("got Connector")
> >>>        dao.setConnector(connector)
> >>>        conf.setTablePrefix("rya_")
> >>>        dao.setConf(conf)
> >>>        store.setRyaDAO(dao)
> >>>        val myRepository = new RyaSailRepository(store)
> >>>        myRepository.initialize()
> >>>
> >>>        def getConnection: SailRepositoryConnection ={
> >>>             myRepository.getConnection
> >>>        }
> >>>
> >>>    }
> >>>
> >>> }
> >>> Jorge
> >>>
> >>>
> >> <logs.zip>
> >>>
> >>>
> >>>
> >>
>
>

Reply via email to