sbarnoud commented on issue #343: HBASE-22634 : Improve performance of BufferedMutator URL: https://github.com/apache/hbase/pull/343#issuecomment-507205600 A spark job with 7 executors, 4 cores each, has an average throughput of more than 500000 insert/s with optimal tuning of: - hbase.client.write.buffer - hbase.htable.threads.max - hbase.client.max.total.tasks - hbase.client.max.perserver.tasks - hbase.client.ipc.pool.size - hbase.client.rpc.compressor (GzipCodec.class.getCanonicalName() seems to be always the best) Where the input dataset is well balanced on the rowkey,on a table presplitted on 26 region servers, with a tiny average record size (500 bytes average). The source of the Spark job is as simple as: ``` df.toJavaRDD().foreachPartition(rows -> { BufferedMutator.ExceptionListener listener = (e, mutator) -> { for (int i = 0; i < e.getNumExceptions(); i++) { log.warn("Failed to send put: " + e.getRow(i)); } }; Configuration hadoopConf = HBaseConfiguration.create(); hadoopConf.set("hbase.client.write.buffer", options.getWriteBufferSize()); hadoopConf.set("hbase.client.max.perrequest.heapsize", options.getWriteBufferSize()); hadoopConf.set("hbase.htable.threads.keepalivetime", "60"); hadoopConf.set("hbase.client.max.total.tasks", options.getMaxTotalTasks()); hadoopConf.set("hbase.client.max.perserver.tasks", options.getMaxServerTasks()); hadoopConf.set("hbase.client.max.perregion.tasks", options.getMaxRegionTasks()); hadoopConf.set("hbase.htable.threads.max", options.getPoolSize()); hadoopConf.set("hbase.hconnection.threads.max", "1"); hadoopConf.set("hbase.hconnection.threads.core", "1"); hadoopConf.set("hbase.hconnection.meta.lookup.threads.max", "5"); hadoopConf.set("hbase.hconnection.meta.lookup.threads.core", "1"); hadoopConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); hadoopConf.set("hbase.client.ipc.pool.size",options.getIpcPoolSize()); // Use the same connection in each executor core Connection connection = HBaseConnectionCache.getConnection(hadoopConf).connection(); TableName tableName = TableName.valueOf(options.getTable()); // If streaming, please modify that to reuse the pool. BufferedMutatorThreadPoolExecutor pool = BufferedMutatorThreadPoolExecutor.getPoolExecutor(hadoopConf); BufferedMutatorParams mutatorParms = new BufferedMutatorParams(tableName) .listener(listener) .pool(pool); if (options.getPeriodicFlushTimeout() > 0) { mutatorParms = mutatorParms.setWriteBufferPeriodicFlushTimeoutMs(options.getPeriodicFlushTimeout()); } try ( BufferedMutatorImpl mutator = (BufferedMutatorImpl) connection.getBufferedMutator(mutatorParms); ) { Iterable<Row> documents = () -> rows; for (Row doc : documents) { String rowKey = doc.getString(0); Put put = new Put(Bytes.toBytes(rowKey)); String value = doc.getString(2); put.addColumn(Bytes.toBytes("valeur"), Bytes.toBytes(doc.getInt(1)), Bytes.toBytes(value == null ? "" : value)); value = doc.getString(3); put.addColumn(Bytes.toBytes("valeur"), Bytes.toBytes("filler"), Bytes.toBytes(value == null ? "" : value)); mutator.mutate(put); } // If streaming: Final flush to make sure every thing is stored at the end of a microbatch //in batch, the close() will flush() mutator.flush(); } catch (Exception e) { log.info("Got exception ", e); } }); ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
