sbarnoud commented on issue #343: HBASE-22634 : Improve performance of 
BufferedMutator  
URL: https://github.com/apache/hbase/pull/343#issuecomment-507205600
 
 
   A spark job with 7 executors, 4 cores each, has an average throughput of 
more than 500000 insert/s with optimal tuning of:
   
   - hbase.client.write.buffer
   - hbase.htable.threads.max
   - hbase.client.max.total.tasks
   - hbase.client.max.perserver.tasks
   - hbase.client.ipc.pool.size
   - hbase.client.rpc.compressor (GzipCodec.class.getCanonicalName() seems to 
be always the best)
   
   Where the input dataset is well balanced on the rowkey,on a table 
presplitted on 26 region servers, with a tiny average record size (500 bytes 
average).
   
   The source of the Spark job is as simple as:
   
   ```
               df.toJavaRDD().foreachPartition(rows -> {
                   BufferedMutator.ExceptionListener listener = (e, mutator) -> 
{
                       for (int i = 0; i < e.getNumExceptions(); i++) {
                           log.warn("Failed to send put: " + e.getRow(i));
                       }
                   };
   
                   Configuration hadoopConf = HBaseConfiguration.create();
                   hadoopConf.set("hbase.client.write.buffer", 
options.getWriteBufferSize());
                   hadoopConf.set("hbase.client.max.perrequest.heapsize", 
options.getWriteBufferSize());
                   hadoopConf.set("hbase.htable.threads.keepalivetime", "60");
                   hadoopConf.set("hbase.client.max.total.tasks", 
options.getMaxTotalTasks());
                   hadoopConf.set("hbase.client.max.perserver.tasks", 
options.getMaxServerTasks());
                   hadoopConf.set("hbase.client.max.perregion.tasks", 
options.getMaxRegionTasks());
                   hadoopConf.set("hbase.htable.threads.max", 
options.getPoolSize());
                   hadoopConf.set("hbase.hconnection.threads.max", "1");
                   hadoopConf.set("hbase.hconnection.threads.core", "1");
                   hadoopConf.set("hbase.hconnection.meta.lookup.threads.max", 
"5");
                   hadoopConf.set("hbase.hconnection.meta.lookup.threads.core", 
"1");
                   hadoopConf.set("hbase.client.rpc.compressor", 
GzipCodec.class.getCanonicalName());
                   
hadoopConf.set("hbase.client.ipc.pool.size",options.getIpcPoolSize());
                   // Use the same connection in each executor core
                   Connection connection = 
HBaseConnectionCache.getConnection(hadoopConf).connection();
   
                   TableName tableName = TableName.valueOf(options.getTable());
   
                   // If streaming, please modify that to reuse the pool.
                   BufferedMutatorThreadPoolExecutor pool = 
BufferedMutatorThreadPoolExecutor.getPoolExecutor(hadoopConf);
   
                   BufferedMutatorParams mutatorParms = new 
BufferedMutatorParams(tableName)
                       .listener(listener)
                       .pool(pool);
   
                   if (options.getPeriodicFlushTimeout() > 0) {
                       mutatorParms = 
mutatorParms.setWriteBufferPeriodicFlushTimeoutMs(options.getPeriodicFlushTimeout());
                   }
   
                   try (
                       BufferedMutatorImpl mutator = (BufferedMutatorImpl) 
connection.getBufferedMutator(mutatorParms);
   
                   ) {
   
                       Iterable<Row> documents = () -> rows;
                       for (Row doc : documents) {
                           String rowKey = doc.getString(0);
   
                           Put put = new Put(Bytes.toBytes(rowKey));
                           String value = doc.getString(2);
                           put.addColumn(Bytes.toBytes("valeur"), 
Bytes.toBytes(doc.getInt(1)), Bytes.toBytes(value == null ? "" : value));
                           value = doc.getString(3);
                           put.addColumn(Bytes.toBytes("valeur"), 
Bytes.toBytes("filler"), Bytes.toBytes(value == null ? "" : value));
   
                           mutator.mutate(put);
                               
                       }
                       // If streaming: Final flush to make sure every thing is 
stored at the end of a microbatch 
                       //in batch, the close() will flush()
                       mutator.flush();
                   } catch (Exception e) {
                       log.info("Got exception ", e);
                   }
               });
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to