This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4999c30d1 KUDU-1973: Send no-op heartbeat operations batched - PART 1
4999c30d1 is described below

commit 4999c30d1fc04123d19258ebda5478303d8e6644
Author: Zoltan Martonka <[email protected]>
AuthorDate: Fri Aug 22 15:25:26 2025 +0000

    KUDU-1973: Send no-op heartbeat operations batched - PART 1
    
    Due to periodically sent heartbeat messages, a Kudu cluster
    with thousands of tablets still consumes significant CPU and
    network resources, even without any user activity. When
    multiple messages are sent to the same host within a short
    time frame, they can be batched to reduce CPU usage.
    
    This results in fewer RPC calls, and some fields can also be
    shared between the no-op messages, further reducing network
    traffic.
    
    Processing heartbeats for the same host together also reduces
    the CPU load. We only process the periodic heartbeats and
    still send the leadership no-op heartbeats unbatched, in a
    separate task. Batching only the periodic no-op heartbeats
    allows us to process the batch request on a single thread,
    since an empty periodic update does not take much time.
    
    The shared timer submits a single shared task for the
    heartbeat senders that should beat. However, if it encounters
    a consensus peer with actual messages, it leaves it out and
    submits a separate task on the RPC threads using the
    consensus peer's RPC token. This ensures that the other no-ops
    in that batch will be sent out in a timely manner.
    
    Using this method instead of buffering and periodically
    flushing the buffer has two significant advantages:
    + If X heartbeats are batched together, instead of X timer
      calls and X further task submissions to RPC threads, we will
      have 1 timer call and [1, X+1] callbacks on the RPC thread -
      usually just 1-2 callbacks.
    + Since there is no buffering, when a write request arrives,
      there is no need for flush/discard logic.
    
    Note that if some of the responses in the batch trigger
    further updates, ProcessResponse calls DoProcessResponse on a
    separate thread. So processing the response on a single thread
    is fine as well; there is no need to change that logic.
    
    Measurement:
    
    I started a 1 master + 4 TS setup using t3.xlarge instances.
    Created 200 tables with 10 hash partitions (RF=3 and 1,000
    initial rows). Then performed the following random sampling:
    
    For 1 tablet server (since the tablets are newly created, they
    are distributed evenly):
    + Measure metrics for 40 seconds.
    + Also call:
        sudo perf stat -e sched:sched_* -p <tserver-pid> sleep 40
      before starting the write workload.
    + Right after the start of the 40-second window, start
      0/3/6/.../24 separate write tasks (from 5 different VMs),
      each writing 1 million rows into a random table.
      (Randomly selected, so the sort term history of load is
      "the same" for any value after the  first few measurements).
    
    I started the cluster with 3 configurations (always starting
    with a fresh cluster):
      1. Batching off
      2. Batching on, max batch size = 10
      3. Batching on, max batch size = 30
    
    The results were the following (execution time of the
    binary writing the rows using kudu_client.so):
    
    ========== write count: 0:==========
    Runs with batching off: 15, 10: 19, 30: 18
        Off client task runtimes avg: n/a, min n/a, max n/a, med n/a
        10  client task runtimes avg: n/a, min n/a, max n/a, med n/a
        30  client task runtimes avg: n/a, min n/a, max n/a, med n/a
        Change (10 & 30): n/a% n/a%
        Off cpu_stime avg: 13568.533333333333, min 9419, max 16947, med 13836
        10  cpu_stime avg: 6807.368421052632, min 4772, max 8788, med 6899
        30  cpu_stime avg: 5833.722222222223, min 3279, max 7128, med 6045.0
        Change (10 & 30): -49.829740224544295% -57.005506203896594%
        Off cpu_utime avg: 40332.86666666667, min 19340, max 50660, med 46046
        10  cpu_utime avg: 28121.105263157893, min 8846, max 41993, med 27630
        30  cpu_utime avg: 26349.055555555555, min 9592, max 39603, med 26169.0
        Change (10 & 30): -30.277444706406786% -34.67100721275563%
        Off rpc_incoming_queue_time avg: 85605.4, min 85252, max 85767, med 
85666
        10  rpc_incoming_queue_time avg: 8997.263157894737, min 8946, max 9031, 
med 8997
        30  rpc_incoming_queue_time avg: 3497.0555555555557, min 3470, max 
3532, med 3496.5
        Change (10 & 30): -89.4898415778739% -95.91491242894075%
        Off utime+stime avg: 53901.4, min 32665, max 65281, med 56304
        10  utime+stime avg: 34928.47368421053, min 14796, max 49508, med 35016
        30  utime+stime avg: 32182.777777777777, min 15019, max 46496, med 
31922.0
        Change (10 & 30): -35.19932008405993% -40.29324325939998%
        Off stat_runtime avg: 54531223159.181816, min 31976380571, max 
65955396580, med 63846401240
        10  stat_runtime avg: 32422918329.933334, min 15178573784, max 
49774116201, med 31081119380
        30  stat_runtime avg: 31854018849.733334, min 15354712860, max 
46880459417, med 33664261243
        Change (10 & 30): -40.5424700720764% -41.58572464668833%
        Off switch avg: 642389.0, min 623210, max 653324, med 649967
        10  switch avg: 261494.33333333334, min 219716, max 270616, med 264348
        30  switch avg: 249353.33333333334, min 247260, max 252089, med 249239
        Change (10 & 30): -59.29346029690213% -61.18343661965984%
    ========== write count: 3:==========
    Runs with batching off: 28, 10: 23, 30: 13
        Off client task runtimes avg: 6.203379134337108, min 5.632460594177246, 
max 6.844575881958008, med 6.185922980308533
        10  client task runtimes avg: 6.370108393655307, min 5.922394752502441, 
max 7.963289260864258, med 6.355126142501831
        30  client task runtimes avg: 6.1353193796598, min 5.597020149230957, 
max 7.383537769317627, med 6.066960334777832
        Change (10 & 30): 2.6877167380487066% -1.097140013586817%
        Off cpu_stime avg: 14144.035714285714, min 9547, max 16631, med 15466.5
        10  cpu_stime avg: 7969.0, min 4657, max 10297, med 8943
        30  cpu_stime avg: 5950.0, min 3908, max 8000, med 6528
        Change (10 & 30): -43.6582305009936% -57.93279852941547%
        Off cpu_utime avg: 43743.46428571428, min 28862, max 54178, med 44754.5
        10  cpu_utime avg: 42348.434782608696, min 17163, max 47387, med 44822
        30  cpu_utime avg: 32516.30769230769, min 14061, max 44001, med 35589
        Change (10 & 30): -3.1891152790136323% -25.665906385638394%
        Off rpc_incoming_queue_time avg: 89446.75, min 88955, max 89839, med 
89493.5
        10  rpc_incoming_queue_time avg: 13017.217391304348, min 12267, max 
13414, med 13063
        30  rpc_incoming_queue_time avg: 7832.2307692307695, min 7498, max 
8057, med 7876
        Change (10 & 30): -85.44696437678915% -91.24369441122145%
        Off utime+stime avg: 57887.5, min 40922, max 70702, med 58746.5
        10  utime+stime avg: 50317.434782608696, min 24283, max 56899, med 53239
        30  utime+stime avg: 38466.307692307695, min 20589, max 50935, med 43039
        Change (10 & 30): -13.077201843906373% -33.54988954038834%
        Off stat_runtime avg: 57709248396.1579, min 42352963849, max 
69618765193, med 55313027027
        10  stat_runtime avg: 49429512557.3125, min 20883553890, max 
54484284473, med 52754845564.0
        30  stat_runtime avg: 38284725561.85714, min 20290863097, max 
50310539913, med 41356308570
        Change (10 & 30): -14.347329187182133% -33.65928923724118%
        Off switch avg: 673106.2631578947, min 661281, max 691078, med 673573
        10  switch avg: 291551.25, min 275920, max 297571, med 293681.5
        30  switch avg: 272828.85714285716, min 268207, max 278893, med 272300
        Change (10 & 30): -56.685702398284036% -59.46719380341614%
    ========== write count: 6:==========
    Runs with batching off: 28, 10: 25, 30: 13
        Off client task runtimes avg: 6.88737925745192, min 6.060053825378418, 
max 8.26358151435852, med 6.81439471244812
        10  client task runtimes avg: 6.90557310740153, min 6.297971725463867, 
max 8.041281938552856, med 6.896089553833008
        30  client task runtimes avg: 6.4807085410142555, min 
6.009506940841675, max 6.913873195648193, med 6.4735718965530396
        Change (10 & 30): 0.26416216197073794% -5.904578523066817%
        Off cpu_stime avg: 13949.535714285714, min 10154, max 18245, med 11736.5
        10  cpu_stime avg: 7253.72, min 5144, max 10370, med 6352
        30  cpu_stime avg: 6419.0, min 4378, max 9172, med 5325
        Change (10 & 30): -48.00027650689859% -53.98413157631974%
        Off cpu_utime avg: 50218.892857142855, min 27029, max 59428, med 54704.0
        10  cpu_utime avg: 43645.04, min 28473, max 52156, med 47802
        30  cpu_utime avg: 38955.230769230766, min 19804, max 46717, med 38318
        Change (10 & 30): -13.09039782267487% -22.429132637299887%
        Off rpc_incoming_queue_time avg: 93353.53571428571, min 92648, max 
93824, med 93383.5
        10  rpc_incoming_queue_time avg: 16530.72, min 10724, max 17280, med 
16793
        30  rpc_incoming_queue_time avg: 11754.0, min 11185, max 12380, med 
11716
        Change (10 & 30): -82.29234717944342% -87.40915391145565%
        Off utime+stime avg: 64168.42857142857, min 37183, max 77202, med 
66883.0
        10  utime+stime avg: 50898.76, min 34228, max 60366, med 54281
        30  utime+stime avg: 45374.230769230766, min 27342, max 54821, med 45185
        Change (10 & 30): -20.679435147235292% -29.288854691645128%
        Off stat_runtime avg: 67253597832.07692, min 44608096874, max 
72681918723, med 69457593188
        10  stat_runtime avg: 49455203107.55556, min 34066502223, max 
57746660961, med 52328569829
        30  stat_runtime avg: 44232893390.8, min 24621447550, max 53333232716, 
med 46963805079
        Change (10 & 30): -26.46459862111992% -34.22969950062223%
        Off switch avg: 694087.7692307692, min 663129, max 713254, med 696097
        10  switch avg: 311993.22222222225, min 279392, max 331949, med 315909
        30  switch avg: 294191.6, min 284201, max 301534, med 298264
        Change (10 & 30): -55.04988906979411% -57.61463995741616%
    ========== write count: 9:==========
    Runs with batching off: 12, 10: 28, 30: 11
        Off client task runtimes avg: 8.548090826582026, min 7.529751539230347, 
max 9.48080325126648, med 8.539497256278992
        10  client task runtimes avg: 8.2908557114147, min 7.11676287651062, 
max 11.546950578689575, med 8.205906629562378
        30  client task runtimes avg: 7.6591967903837865, min 
6.522926092147827, max 8.579655647277832, med 7.697844862937927
        Change (10 & 30): -3.009269793523961% -10.398743464845307%
        Off cpu_stime avg: 15005.166666666666, min 11661, max 19840, med 12412.0
        10  cpu_stime avg: 8288.464285714286, min 5129, max 12050, med 7104.5
        30  cpu_stime avg: 6007.545454545455, min 4793, max 9926, med 5750
        Change (10 & 30): -44.76259764493816% -59.96348732406312%
        Off cpu_utime avg: 59056.666666666664, min 56031, max 63356, med 59377.0
        10  cpu_utime avg: 49433.57142857143, min 26090, max 57382, med 52271.5
        30  cpu_utime avg: 44772.818181818184, min 27573, max 51210, med 50317
        Change (10 & 30): -16.294680653770786% -24.186682539112404%
        Off rpc_incoming_queue_time avg: 96753.83333333333, min 96291, max 
97393, med 96774.5
        10  rpc_incoming_queue_time avg: 21073.464285714286, min 20380, max 
21682, med 21082.0
        30  rpc_incoming_queue_time avg: 16262.272727272728, min 15920, max 
16578, med 16283
        Change (10 & 30): -78.21950453052064% -83.19211532296974%
        Off utime+stime avg: 74061.83333333333, min 67692, max 79069, med 
73660.0
        10  utime+stime avg: 57722.03571428572, min 31219, max 68704, med 
61430.5
        30  utime+stime avg: 50780.36363636364, min 32366, max 60608, med 56080
        Change (10 & 30): -22.06237259278523% -31.435178754198212%
        Off stat_runtime avg: 74426347984.6, min 69044550939, max 76319774349, 
med 75772265736
        10  stat_runtime avg: 60146731951.2, min 56539088994, max 66356993395, 
med 60227311137.5
        30  stat_runtime avg: 58533466007.0, min 58533466007, max 58533466007, 
med 58533466007
        Change (10 & 30): -19.18623769683646% -21.353838268254812%
        Off switch avg: 735295.8, min 709459, max 746328, med 741901
        10  switch avg: 339284.5, min 316234, max 365507, med 333199.5
        30  switch avg: 323555.0, min 323555, max 323555, med 323555
        Change (10 & 30): -53.857413574237746% -55.99662067973189%
    ========== write count: 12:==========
    Runs with batching off: 28, 10: 26, 30: 19
        Off client task runtimes avg: 10.253776160994573, min 
9.013848304748535, max 11.68369174003601, med 10.17331576347351
        10  client task runtimes avg: 10.120543375229223, min 8.6819486618042, 
max 12.503127098083496, med 9.816626071929932
        30  client task runtimes avg: 9.125201345535746, min 6.506812334060669, 
max 10.293994665145874, med 9.27278220653534
        Change (10 & 30): -1.2993533667349566% -11.006431169737564%
        Off cpu_stime avg: 14449.607142857143, min 11600, max 21255, med 12862.5
        10  cpu_stime avg: 8505.76923076923, min 6634, max 12915, med 7722.0
        30  cpu_stime avg: 7201.578947368421, min 5122, max 11262, med 6769
        Change (10 & 30): -41.13494473118685% -50.16072841171763%
        Off cpu_utime avg: 61044.82142857143, min 44149, max 66881, med 62788.5
        10  cpu_utime avg: 56311.92307692308, min 44619, max 64298, med 56596.5
        30  cpu_utime avg: 50936.84210526316, min 27324, max 58087, med 54226
        Change (10 & 30): -7.75315291434887% -16.558291246925204%
        Off rpc_incoming_queue_time avg: 100650.96428571429, min 99736, max 
101318, med 100699.5
        10  rpc_incoming_queue_time avg: 24776.884615384617, min 22855, max 
25639, med 24807.5
        30  rpc_incoming_queue_time avg: 20277.105263157893, min 19619, max 
21074, med 20298
        Change (10 & 30): -75.38336091341226% -79.8540377560636%
        Off utime+stime avg: 75494.42857142857, min 55942, max 86122, med 
76436.0
        10  utime+stime avg: 64817.692307692305, min 51253, max 74834, med 
64551.0
        30  utime+stime avg: 58138.42105263158, min 32446, max 65996, med 61492
        Change (10 & 30): -14.14241615675591% -22.989785931521702%
        Off stat_runtime avg: 76728076228.42857, min 66549332918, max 
82294242231, med 80703935110
        10  stat_runtime avg: 62724548520.2, min 49469140539, max 71325822979, 
med 64903018497
        30  stat_runtime avg: 54956350974.333336, min 45573244914, max 
62202256097, med 57093551912
        Change (10 & 30): -18.250852095572444% -28.375174153041748%
        Off switch avg: 769651.1428571428, min 741196, max 783936, med 777328
        10  switch avg: 364841.8, min 324393, max 403493, med 361469
        30  switch avg: 342820.0, min 312424, max 363162, med 352874
        Change (10 & 30): -52.5964713512133% -55.45774170783868%
    ========== write count: 15:==========
    Runs with batching off: 29, 10: 25, 30: 11
        Off client task runtimes avg: 12.093589447937717, min 
9.694162607192993, max 13.974745273590088, med 11.848691463470459
        10  client task runtimes avg: 12.023993889490763, min 9.89328384399414, 
max 14.774570941925049, med 11.77902603149414
        30  client task runtimes avg: 10.500564222624808, min 
6.7602620124816895, max 12.283529281616211, med 10.966847896575928
        Change (10 & 30): -0.5754747897351642% -13.172476477482565%
        Off cpu_stime avg: 15818.413793103447, min 12330, max 22206, med 13621
        10  cpu_stime avg: 10410.12, min 7513, max 14940, med 8576
        30  cpu_stime avg: 8565.09090909091, min 7139, max 12624, med 7520
        Change (10 & 30): -34.18986166275009% -45.853667623582204%
        Off cpu_utime avg: 64650.724137931036, min 42791, max 69649, med 66646
        10  cpu_utime avg: 60900.88, min 47295, max 68286, med 61477
        30  cpu_utime avg: 57182.27272727273, min 48519, max 60889, med 57494
        Change (10 & 30): -5.8001579842026585% -11.551999626059118%
        Off rpc_incoming_queue_time avg: 104149.72413793103, min 102866, max 
105221, med 104088
        10  rpc_incoming_queue_time avg: 28739.68, min 27077, max 30125, med 
28748
        30  rpc_incoming_queue_time avg: 24547.454545454544, min 23686, max 
25125, med 24814
        Change (10 & 30): -72.40541898897541% -76.43061011573585%
        Off utime+stime avg: 80469.13793103448, min 61847, max 90811, med 80286
        10  utime+stime avg: 71311.0, min 60967, max 82608, med 71005
        30  utime+stime avg: 65747.36363636363, min 60394, max 69941, med 66757
        Change (10 & 30): -11.380932077193862% -18.294932284832033%
        Off stat_runtime avg: 81980607740.77777, min 55653565554, max 
87411163108, med 85442439087
        10  stat_runtime avg: 67729727090.3, min 55481286826, max 78981283495, 
med 67728369751.0
        30  stat_runtime avg: 58272593883.666664, min 51761458017, max 
64954871741, med 58101451893
        Change (10 & 30): -17.383233722222425% -28.91905111520485%
        Off switch avg: 806852.8888888889, min 771841, max 826717, med 809368
        10  switch avg: 405952.4, min 369202, max 436256, med 413122.0
        30  switch avg: 357860.3333333333, min 340666, max 367513, med 365402
        Change (10 & 30): -49.686937285552254% -55.64738773803734%
    ========== write count: 18:==========
    Runs with batching off: 17, 10: 25, 30: 14
        Off client task runtimes avg: 14.400622643676458, min 
12.48328423500061, max 17.490095376968384, med 14.047804951667786
        10  client task runtimes avg: 13.62668704509735, min 
11.661161422729492, max 15.913615226745605, med 13.4107586145401
        30  client task runtimes avg: 12.081949627588665, min 
6.308183908462524, max 13.923750400543213, med 12.532055974006653
        Change (10 & 30): -5.374320386896292% -16.101199742956663%
        Off cpu_stime avg: 16935.58823529412, min 13577, max 22825, med 14263
        10  cpu_stime avg: 11495.4, min 8416, max 16228, med 9082
        30  cpu_stime avg: 9291.07142857143, min 7162, max 14057, med 8078.5
        Change (10 & 30): -32.12281829075564% -45.13877345453734%
        Off cpu_utime avg: 70929.58823529411, min 67674, max 76405, med 70892
        10  cpu_utime avg: 64712.68, min 52765, max 72801, med 65688
        30  cpu_utime avg: 58736.642857142855, min 45035, max 64122, med 59485.0
        Change (10 & 30): -8.764901065928676% -17.190210293768672%
        Off rpc_incoming_queue_time avg: 107683.5294117647, min 106186, max 
108626, med 107884
        10  rpc_incoming_queue_time avg: 32797.88, min 31176, max 33893, med 
32851
        30  rpc_incoming_queue_time avg: 28709.35714285714, min 28256, max 
29613, med 28559.5
        Change (10 & 30): -69.54234303132272% -73.33913802817781%
        Off utime+stime avg: 87865.17647058824, min 81530, max 98711, med 85818
        10  utime+stime avg: 76208.08, min 65998, max 83642, med 75503
        30  utime+stime avg: 68027.71428571429, min 52197, max 77436, med 
69101.0
        Change (10 & 30): -13.267026754894529% -22.577160806721064%
        Off stat_runtime avg: 89877829083.33333, min 85904071609, max 
93775583142, med 90007821176.0
        10  stat_runtime avg: 71741065414.63637, min 60447250324, max 
78006781555, med 73925084586
        30  stat_runtime avg: 61287185255.0, min 56450910546, max 71324482809, 
med 58686673832.5
        Change (10 & 30): -20.179352186934597% -31.810563428078055%
        Off switch avg: 844333.5, min 833024, max 855182, med 845283.0
        10  switch avg: 439501.7272727273, min 410184, max 471723, med 442107
        30  switch avg: 387624.75, min 366818, max 405787, med 388947.0
        Change (10 & 30): -47.94690400502558% -54.09103748696458%
    ========== write count: 21:==========
    Runs with batching off: 14, 10: 32, 30: 15
        Off client task runtimes avg: 16.00026828658824, min 13.41559386253357, 
max 17.893533945083618, med 15.739280700683594
        10  client task runtimes avg: 15.867586015945388, min 
10.710279941558838, max 21.145692586898804, med 15.130183696746826
        30  client task runtimes avg: 13.433500899208916, min 
6.494636297225952, max 15.636562824249268, med 14.214406967163086
        Change (10 & 30): -0.829250286722194% -16.0420271798245%
        Off cpu_stime avg: 17945.428571428572, min 13430, max 24512, med 15011.5
        10  cpu_stime avg: 10738.25, min 8612, max 17861, med 9526.5
        30  cpu_stime avg: 10200.266666666666, min 7895, max 15310, med 8659
        Change (10 & 30): -40.161640847649224% -43.15952597026966%
        Off cpu_utime avg: 72935.64285714286, min 65139, max 79458, med 73527.0
        10  cpu_utime avg: 71266.0625, min 51727, max 79497, med 70270.5
        30  cpu_utime avg: 63485.933333333334, min 50417, max 69125, med 65556
        Change (10 & 30): -2.2891144737190006% -12.956229839940425%
        Off rpc_incoming_queue_time avg: 111131.85714285714, min 109993, max 
112204, med 111220.5
        10  rpc_incoming_queue_time avg: 36567.9375, min 34429, max 38382, med 
36493.5
        30  rpc_incoming_queue_time avg: 32505.6, min 31728, max 33168, med 
32553
        Change (10 & 30): -67.09500008355582% -70.75042131419175%
        Off utime+stime avg: 90881.07142857143, min 78856, max 98983, med 
90825.0
        10  utime+stime avg: 82004.3125, min 60339, max 96027, med 80178.5
        30  utime+stime avg: 73686.2, min 58312, max 83714, med 74202
        Change (10 & 30): -9.76744528760115% -18.920190044288653%
        Off stat_runtime avg: 94648239082.2, min 92043681683, max 96604701517, 
med 95079846312
        10  stat_runtime avg: 85979736782.5, min 79247833687, max 94078032523, 
med 84953429029.5
        30  stat_runtime avg: 74313071072.0, min 63913748224, max 80499907081, 
med 76419314491.5
        Change (10 & 30): -9.15865142738851% -21.48499349527183%
        Off switch avg: 897395.0, min 875049, max 913416, med 901805
        10  switch avg: 474644.8333333333, min 453589, max 501224, med 473482.0
        30  switch avg: 434748.75, min 420292, max 452761, med 432971.0
        Change (10 & 30): -47.10859394878138% -51.55436012012547%
    ========== write count: 24:==========
    Runs with batching off: 18, 10: 21, 30: 12
        Off client task runtimes avg: 17.88868013134709, min 
14.400426149368286, max 20.726062059402466, med 17.609864950180054
        10  client task runtimes avg: 17.408534667600932, min 
9.953590393066406, max 21.59314775466919, med 17.0849187374115
        30  client task runtimes avg: 15.434335373004554, min 7.21691107749939, 
max 17.90212059020996, med 16.086348295211792
        Change (10 & 30): -2.684074287318594% -13.720099752030801%
        Off cpu_stime avg: 18045.666666666668, min 14732, max 26013, med 15529.0
        10  cpu_stime avg: 11599.238095238095, min 8406, max 18696, med 10275
        30  cpu_stime avg: 11924.666666666666, min 8512, max 16717, med 9473.0
        Change (10 & 30): -35.72286183993519% -33.91950052644218%
        Off cpu_utime avg: 78219.0, min 72128, max 81487, med 78120.0
        10  cpu_utime avg: 75045.85714285714, min 62685, max 86617, med 74246
        30  cpu_utime avg: 69422.08333333333, min 61531, max 73578, med 70479.5
        Change (10 & 30): -4.056741785426632% -11.246521518642105%
        Off rpc_incoming_queue_time avg: 114673.38888888889, min 112955, max 
116400, med 114882.5
        10  rpc_incoming_queue_time avg: 40207.28571428572, min 33425, max 
41661, med 40531
        30  rpc_incoming_queue_time avg: 36444.083333333336, min 35255, max 
37595, med 36542.0
        Change (10 & 30): -64.93756214596223% -68.21923230275743%
        Off utime+stime avg: 96264.66666666667, min 86860, max 106203, med 
95218.0
        10  utime+stime avg: 86645.09523809524, min 71091, max 96892, med 85338
        30  utime+stime avg: 81346.75, min 70043, max 88279, med 82009.5
        Change (10 & 30): -9.992837207737804% -15.496772786138225%
        Off stat_runtime avg: 97726065525.8, min 92716532372, max 100737581795, 
med 97800868584
        10  stat_runtime avg: 86468693080.0, min 82172604670, max 88534687430, 
med 87583740110.0
        30  stat_runtime avg: 81758720786.6, min 77844194290, max 84349902830, 
med 82248094233
        Change (10 & 30): -11.519314100318523% -16.33888016803927%
        Off switch avg: 911795.0, min 899299, max 929683, med 906367
        10  switch avg: 511426.5, min 475979, max 529168, med 520279.5
        30  switch avg: 472257.4, min 454054, max 495222, med 464972
        Change (10 & 30): -43.90992492830077% -48.20574800256636%
    
    Change-Id: Ie92ba4de5eae00d56cd513cb644dce8fb6e14538
    Reviewed-on: http://gerrit.cloudera.org:8080/22867
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/consensus/CMakeLists.txt                  |   1 +
 src/kudu/consensus/consensus.proto                 |  38 +++
 src/kudu/consensus/consensus_peers-test.cc         |   3 +
 src/kudu/consensus/consensus_peers.cc              | 168 +++++++++--
 src/kudu/consensus/consensus_peers.h               |  20 +-
 src/kudu/consensus/consensus_queue.cc              |  15 +-
 src/kudu/consensus/consensus_queue.h               |   3 +-
 src/kudu/consensus/multi_raft_batcher.cc           | 306 +++++++++++++++++++++
 src/kudu/consensus/multi_raft_batcher.h            | 184 +++++++++++++
 src/kudu/consensus/multi_raft_consensus_data.h     |  78 ++++++
 src/kudu/consensus/peer_manager.cc                 |  10 +-
 src/kudu/consensus/peer_manager.h                  |   4 +-
 src/kudu/consensus/raft_consensus.cc               |   2 +
 src/kudu/consensus/raft_consensus.h                |   2 +
 src/kudu/consensus/raft_consensus_quorum-test.cc   |   1 +
 src/kudu/master/sys_catalog.cc                     |   1 +
 src/kudu/tablet/tablet_replica-test-base.cc        |  11 +-
 src/kudu/tablet/tablet_replica.cc                  |   2 +
 src/kudu/tablet/tablet_replica.h                   |   2 +
 .../tserver/tablet_copy_source_session-test.cc     |   1 +
 src/kudu/tserver/tablet_service.cc                 | 106 ++++++-
 src/kudu/tserver/tablet_service.h                  |   7 +
 src/kudu/tserver/ts_tablet_manager.cc              |  13 +-
 src/kudu/tserver/ts_tablet_manager.h               |   3 +
 24 files changed, 937 insertions(+), 44 deletions(-)

diff --git a/src/kudu/consensus/CMakeLists.txt 
b/src/kudu/consensus/CMakeLists.txt
index 5d20b66cc..514bb2279 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -103,6 +103,7 @@ set(CONSENSUS_SRCS
   consensus_queue.cc
   leader_election.cc
   log_cache.cc
+  multi_raft_batcher.cc
   peer_manager.cc
   pending_rounds.cc
   quorum_util.cc
diff --git a/src/kudu/consensus/consensus.proto 
b/src/kudu/consensus/consensus.proto
index dea1eed93..fa10fa3de 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -416,6 +416,41 @@ message ConsensusResponsePB {
   optional tserver.TabletServerErrorPB error = 999;
 }
 
+// Same as ConsensusRequestPB. But dest_uuid and caller_uuid are shared between
+// messages.
+message BatchedNoOpConsensusRequestPB {
+  required string tablet_id = 1;
+  required int64 caller_term = 3;
+  optional OpId preceding_id = 4;
+  optional int64 committed_index = 8;
+  optional int64 all_replicated_index = 9;
+  optional fixed64 safe_timestamp = 10;
+  optional int64 last_idx_appended_to_leader = 11;
+}
+
+// Batched NoOp consensus updates.
+message MultiRaftConsensusRequestPB {
+  repeated BatchedNoOpConsensusRequestPB consensus_requests = 1;
+  optional bytes dest_uuid = 2;
+  required bytes caller_uuid = 3;
+}
+
+// Same as ConsensusResponsePB. But responder_uuid and certain type of errors
+// are shared between messages.
+message BatchedNoOpConsensusResponsePB {
+  optional int64 responder_term = 1;
+  optional ConsensusStatusPB status = 2;
+  optional bool server_quiescing = 3;
+  optional tserver.TabletServerErrorPB error = 999;
+}
+
+message MultiRaftConsensusResponsePB {
+  repeated BatchedNoOpConsensusResponsePB consensus_responses = 1;
+  optional bytes responder_uuid = 2;
+  // For errors shared between tablets (e.g. wrong caller id)
+  optional tserver.TabletServerErrorPB error = 999;
+}
+
 // A message reflecting the status of an in-flight op.
 message OpStatusPB {
   required OpId op_id = 1;
@@ -587,6 +622,9 @@ service ConsensusService {
   // Analogous to AppendEntries in Raft, but only used for followers.
   rpc UpdateConsensus(ConsensusRequestPB) returns (ConsensusResponsePB);
 
+  // Batched call of ConsensusRequestPB. Only exists for performance reasons.
+  rpc MultiRaftUpdateConsensus(MultiRaftConsensusRequestPB) returns 
(MultiRaftConsensusResponsePB);
+
   // RequestVote() from Raft.
   rpc RequestConsensusVote(VoteRequestPB) returns (VoteResponsePB);
 
diff --git a/src/kudu/consensus/consensus_peers-test.cc 
b/src/kudu/consensus/consensus_peers-test.cc
index 236f730f9..ebcaf98ab 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -148,6 +148,7 @@ class ConsensusPeersTest : public KuduTest {
                         kTabletId,
                         kLeaderUuid,
                         message_queue_.get(),
+                        nullptr, // We do not use heartbeat batching.
                         raft_pool_token_.get(),
                         &factory,
                         peer);
@@ -288,6 +289,7 @@ TEST_F(ConsensusPeersTest, 
TestCloseWhenRemotePeerDoesntMakeProgress) {
                       kTabletId,
                       kLeaderUuid,
                       message_queue_.get(),
+                      nullptr, // We do not use heartbeat batching.
                       raft_pool_token_.get(),
                       &factory,
                       &peer);
@@ -326,6 +328,7 @@ TEST_F(ConsensusPeersTest, 
TestDontSendOneRpcPerWriteWhenPeerIsDown) {
                       kTabletId,
                       kLeaderUuid,
                       message_queue_.get(),
+                      nullptr, // We do not use heartbeat batching.
                       raft_pool_token_.get(),
                       &factory,
                       &peer);
diff --git a/src/kudu/consensus/consensus_peers.cc 
b/src/kudu/consensus/consensus_peers.cc
index bc25f450e..6270ec7dc 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -37,6 +37,8 @@
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_batcher.h" // IWYU pragma: keep
+#include "kudu/consensus/multi_raft_consensus_data.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
@@ -108,6 +110,7 @@ void Peer::NewRemotePeer(RaftPeerPB peer_pb,
                          string tablet_id,
                          string leader_uuid,
                          PeerMessageQueue* queue,
+                         std::shared_ptr<MultiRaftHeartbeatBatcher> 
multi_raft_batcher,
                          ThreadPoolToken* raft_pool_token,
                          PeerProxyFactory* peer_proxy_factory,
                          shared_ptr<Peer>* peer) {
@@ -116,6 +119,7 @@ void Peer::NewRemotePeer(RaftPeerPB peer_pb,
       std::move(tablet_id),
       std::move(leader_uuid),
       queue,
+      multi_raft_batcher,
       raft_pool_token,
       peer_proxy_factory));
   new_peer->Init();
@@ -126,6 +130,7 @@ Peer::Peer(RaftPeerPB peer_pb,
            string tablet_id,
            string leader_uuid,
            PeerMessageQueue* queue,
+           std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher,
            ThreadPoolToken* raft_pool_token,
            PeerProxyFactory* peer_proxy_factory)
     : tablet_id_(std::move(tablet_id)),
@@ -138,6 +143,8 @@ Peer::Peer(RaftPeerPB peer_pb,
       queue_(queue),
       failed_attempts_(0),
       messenger_(peer_proxy_factory_->messenger()),
+      multi_raft_batcher_(multi_raft_batcher),
+      multi_raft_batcher_registration_(std::nullopt),
       raft_pool_token_(raft_pool_token),
       request_pending_(false),
       closed_(false),
@@ -151,18 +158,34 @@ void Peer::Init() {
     queue_->TrackPeer(peer_pb_);
   }
 
-  // Capture a weak_ptr reference into the functor so it can safely handle
-  // outliving the peer.
   weak_ptr<Peer> w_this = shared_from_this();
-  heartbeater_ = PeriodicTimer::Create(
-      messenger_,
-      [w_this = std::move(w_this)]() {
-        if (auto p = w_this.lock()) {
-          WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed");
-        }
-      },
-      MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
-  heartbeater_->Start();
+  if (multi_raft_batcher_) {
+    // We must use the weak ptr. If we use shared_ptr, ~Peer will be called
+    // on the raft thread pool and it might cause a deadlock.
+    multi_raft_batcher_registration_ =
+        multi_raft_batcher_->Subscribe({[w_this](MultiRaftConsensusData* data) 
{
+          if (auto this_ptr = w_this.lock()) {
+            // If the request is already pending, we do not send it again.
+            if (this_ptr->request_pending_) {
+              return;
+            }
+            this_ptr->SendNextRequest(true, data);
+          }
+        }});
+
+  } else {
+    // Capture a weak_ptr reference into the functor so it can safely handle
+    // outliving the peer.
+    heartbeater_ = PeriodicTimer::Create(
+        messenger_,
+        [w_this = std::move(w_this)]() {
+          if (auto p = w_this.lock()) {
+            WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed");
+          }
+        },
+        MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
+    heartbeater_->Start();
+  }
 }
 
 Status Peer::SignalRequest(bool even_if_queue_empty) {
@@ -184,20 +207,20 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
   // Capture a weak_ptr reference into the submitted functor so that we can
   // safely handle the functor outliving its peer.
   weak_ptr<Peer> w_this(shared_from_this());
-  return raft_pool_token_->Submit([even_if_queue_empty, w_this = 
std::move(w_this)]() {
-    if (auto p = w_this.lock()) {
-      p->SendNextRequest(even_if_queue_empty);
-    }
+  return raft_pool_token_->Submit(
+    [even_if_queue_empty, w_this = std::move(w_this)]() {
+      if (auto p = w_this.lock()) {
+        p->SendNextRequest(even_if_queue_empty, nullptr);
+      }
   });
 }
 
-void Peer::SendNextRequest(bool even_if_queue_empty) {
+void Peer::SendNextRequest(bool even_if_queue_empty, MultiRaftConsensusData* 
mrc_data) {
   std::unique_lock l(peer_lock_);
   if (PREDICT_FALSE(closed_)) {
     return;
   }
 
-  // Only allow one request at a time.
   if (request_pending_) {
     return;
   }
@@ -227,16 +250,44 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
   bool needs_tablet_copy = false;
   int64_t commit_index_before = request_.has_committed_index() ?
       request_.committed_index() : kMinimumOpIdIndex;
-  Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), &request_,
-                                    &replicate_msg_refs_, &needs_tablet_copy);
-  int64_t commit_index_after = request_.has_committed_index() ?
-      request_.committed_index() : kMinimumOpIdIndex;
+  Status s;
+  if (mrc_data) {
+    bool ops_pending = false;
+    s = queue_->RequestForPeer(peer_pb_.permanent_uuid(),
+                               &request_,
+                               &replicate_msg_refs_,
+                               &needs_tablet_copy,
+                               &ops_pending);
+    if (s.ok() && (ops_pending || (request_.has_committed_index() &&
+                                      request_.committed_index() > 
commit_index_before))) {
+      weak_ptr<Peer> w_this = shared_from_this();
+      // We don't want to send out a heavy request on the batching thread.
+      // So if we have any ops, we submit a task to the raft thread pool.
+      Status s2 = raft_pool_token_->Submit([w_this, even_if_queue_empty]() {
+        if (auto p = w_this.lock()) {
+          WARN_NOT_OK(p->SignalRequest(even_if_queue_empty), "SignalRequest 
failed");
+        }
+      });
+      if (s2.ok()) {
+        return;
+      }
+      DCHECK(false);
+      LOG_WITH_PREFIX_UNLOCKED(WARNING)
+          << "Failed to submit SignalRequest callback, processing heartbeat 
with"
+          << " ops on mrc thread";
+    }
+  } else {
+    s = queue_->RequestForPeer(
+        peer_pb_.permanent_uuid(), &request_, &replicate_msg_refs_, 
&needs_tablet_copy);
+  }
 
   if (PREDICT_FALSE(!s.ok())) {
     VLOG_WITH_PREFIX_UNLOCKED(1) << s.ToString();
     return;
   }
 
+  int64_t commit_index_after = request_.has_committed_index() ?
+      request_.committed_index() : kMinimumOpIdIndex;
   // NOTE: we only perform this check after creating the RequestForPeer() call
   // to ensure any peer health updates that happen therein associated with this
   // peer actually happen. E.g. if we haven't been able to create a proxy in a
@@ -274,7 +325,12 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
 
   if (req_has_ops) {
     // If we're actually sending ops there's no need to heartbeat for a while.
-    heartbeater_->Snooze();
+    if (multi_raft_batcher_) {
+      // TODO(martonka) add snooze logic to MultiRaftHeartbeatBatcher (I don't
+      // expect any measurable performance impact).
+    } else {
+      heartbeater_->Snooze();
+    }
   }
 
   if (!has_sent_first_request_) {
@@ -291,16 +347,36 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
       << SecureShortDebugString(request_);
 
   controller_.Reset();
+
   request_pending_ = true;
   l.unlock();
 
   // Capture a shared_ptr reference into the RPC callback so that we're 
guaranteed
   // that this object outlives the RPC.
   shared_ptr<Peer> s_this = shared_from_this();
-  proxy_->UpdateAsync(request_, &response_, &controller_,
-                      [s_this]() {
-                        s_this->ProcessResponse();
-                      });
+
+  if (mrc_data && !req_has_ops) {
+    if (mrc_data->batch_req.consensus_requests_size() == 0) {
+      // If this is the first request in the batch, we need to set the
+      // responder UUID and term.
+      mrc_data->batch_req.set_caller_uuid(request_.caller_uuid());
+      mrc_data->batch_req.set_dest_uuid(request_.dest_uuid());
+    } else {
+      DCHECK(mrc_data->batch_req.caller_uuid() == request_.caller_uuid());
+      DCHECK(mrc_data->batch_req.dest_uuid() == request_.dest_uuid());
+    }
+    *mrc_data->batch_req.add_consensus_requests() = ToNoOpRequest(request_);
+    mrc_data->response_callback_data.emplace_back(
+        [s_this](const rpc::RpcController& controller,
+                 const MultiRaftConsensusResponsePB& root,
+                 const BatchedNoOpConsensusResponsePB* resp) {
+          s_this->ProcessResponseFromBatch(controller, root, resp);
+        });
+  } else {
+    DCHECK(!mrc_data) << "Messages with ops should not be sent on MRC thread";
+    proxy_->UpdateAsync(
+        request_, &response_, &controller_, [s_this]() { 
s_this->ProcessSingleResponse(); });
+  }
 }
 
 void Peer::StartElection() {
@@ -335,7 +411,39 @@ void Peer::StartElection() {
     });
 }
 
-void Peer::ProcessResponse() {
+void Peer::ProcessResponseFromBatch(const rpc::RpcController& controller,
+                                    const MultiRaftConsensusResponsePB& root,
+                                    const BatchedNoOpConsensusResponsePB* 
resp) {
+  response_.Clear();
+  if (root.has_error()) {
+    *response_.mutable_error() = root.error();
+  }
+  if (root.has_responder_uuid()) {
+    response_.set_responder_uuid(root.responder_uuid());
+  }
+  if (resp != nullptr) {
+    if (resp->has_responder_term()) {
+      response_.set_responder_term(resp->responder_term());
+    }
+    if (resp->has_status()) {
+      *response_.mutable_status() = resp->status();
+    }
+    if (resp->has_server_quiescing()) {
+      response_.set_server_quiescing(resp->server_quiescing());
+    }
+    if (resp->has_error()) {
+      *response_.mutable_error() = resp->error();
+    }
+  }
+
+  ProcessResponse(controller);
+}
+
+void Peer::ProcessSingleResponse() {
+  ProcessResponse(controller_);
+}
+
+void Peer::ProcessResponse(const rpc::RpcController& controller) {
   // Note: This method runs on the reactor thread.
   std::lock_guard lock(peer_lock_);
   if (PREDICT_FALSE(closed_)) {
@@ -346,7 +454,7 @@ void Peer::ProcessResponse() {
   MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction);
 
   // Process RpcController errors.
-  const auto controller_status = controller_.status();
+  const auto controller_status = controller.status();
   if (!controller_status.ok()) {
     auto ps = controller_status.IsRemoteError() ?
         PeerStatus::REMOTE_ERROR : PeerStatus::RPC_LAYER_ERROR;
@@ -526,6 +634,10 @@ void Peer::Close() {
   }
   {
     std::lock_guard lock(peer_lock_);
+    if (multi_raft_batcher_registration_) {
+      DCHECK(multi_raft_batcher_);
+      multi_raft_batcher_->Unsubscribe(*multi_raft_batcher_registration_);
+    }
     closed_ = true;
   }
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Closing peer: " << 
peer_pb_.permanent_uuid();
diff --git a/src/kudu/consensus/consensus_peers.h 
b/src/kudu/consensus/consensus_peers.h
index 60905dc45..badf259fb 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <cstdint>
 #include <memory>
+#include <optional>
 #include <ostream>
 #include <string>
 #include <vector>
@@ -46,6 +47,8 @@ class PeriodicTimer;
 }
 
 namespace consensus {
+class MultiRaftHeartbeatBatcher;
+struct MultiRaftConsensusData;
 class PeerMessageQueue;
 class PeerProxy;
 class PeerProxyFactory;
@@ -118,6 +121,7 @@ class Peer :
                             std::string tablet_id,
                             std::string leader_uuid,
                             PeerMessageQueue* queue,
+                            std::shared_ptr<MultiRaftHeartbeatBatcher> 
multi_raft_batcher,
                             ThreadPoolToken* raft_pool_token,
                             PeerProxyFactory* peer_proxy_factory,
                             std::shared_ptr<Peer>* peer);
@@ -127,18 +131,27 @@ class Peer :
        std::string tablet_id,
        std::string leader_uuid,
        PeerMessageQueue* queue,
+       std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher,
        ThreadPoolToken* raft_pool_token,
        PeerProxyFactory* peer_proxy_factory);
 
  private:
-  void SendNextRequest(bool even_if_queue_empty);
+  // 'mrc_data' allows us to send periodic heartbeats in batches, removing
+  // some load from the system.
+  void SendNextRequest(bool even_if_queue_empty, MultiRaftConsensusData* 
mrc_data = nullptr);
+
+  void ProcessResponseFromBatch(const rpc::RpcController& controller,
+                                const MultiRaftConsensusResponsePB& root,
+                                const BatchedNoOpConsensusResponsePB* resp);
+
+  void ProcessSingleResponse();
 
   // Signals that a response was received from the peer.
   //
   // This method is called from the reactor thread and calls
   // DoProcessResponse() on raft_pool_token_ to do any work that requires IO or
   // lock-taking.
-  void ProcessResponse();
+  void ProcessResponse(const rpc::RpcController& controller);
 
   // Run on 'raft_pool_token'. Does response handling that requires IO or may 
block.
   void DoProcessResponse();
@@ -201,6 +214,9 @@ class Peer :
 
   std::shared_ptr<rpc::Messenger> messenger_;
 
+  std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher_;
+  std::optional<uint64_t> multi_raft_batcher_registration_;
+
   // Thread pool token used to construct requests to this peer.
   //
   // RaftConsensus owns this shared token and is responsible for destroying it.
diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index def546acf..0e5f40c1b 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -646,7 +646,8 @@ HealthReportPB::HealthStatus 
PeerMessageQueue::PeerHealthStatus(const TrackedPee
 Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
                                         vector<ReplicateRefPtr>* msg_refs,
-                                        bool* needs_tablet_copy) {
+                                        bool* needs_tablet_copy,
+                                        bool* return_no_ops_only) {
   // Maintain a thread-safe copy of necessary members.
   OpId preceding_id;
   int64_t current_term;
@@ -664,6 +665,13 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     }
     peer_copy = *peer;
 
+    if (return_no_ops_only) {
+      *return_no_ops_only = peer_copy.last_exchange_status == 
PeerStatus::TABLET_NOT_FOUND
+        || log_cache_.HasOpBeenWritten(peer->next_index);
+      if (*return_no_ops_only) {
+        return Status::OK(); // Nothing is touched, we can try again from a 
different thread later.
+      }
+    }
     // Clear the requests without deleting the entries, as they may be in use 
by other peers.
     request->mutable_ops()->UnsafeArenaExtractSubrange(0, request->ops_size(), 
nullptr);
 
@@ -720,6 +728,11 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     vector<ReplicateRefPtr> messages;
     int64_t max_batch_size = FLAGS_consensus_max_batch_size_bytes - 
request->ByteSizeLong();
 
+    // messages were added since early return check. There is a very small
+    // chance that we end up here with return_no_ops_only passed.
+    if (return_no_ops_only != nullptr) {
+      max_batch_size = 0;
+    }
     // We try to get the follower's next_index from our log.
     Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
                                   max_batch_size,
diff --git a/src/kudu/consensus/consensus_queue.h 
b/src/kudu/consensus/consensus_queue.h
index 16d1f7ed3..432c737d3 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -276,7 +276,8 @@ class PeerMessageQueue {
   Status RequestForPeer(const std::string& uuid,
                         ConsensusRequestPB* request,
                         std::vector<ReplicateRefPtr>* msg_refs,
-                        bool* needs_tablet_copy);
+                        bool* needs_tablet_copy,
+                        bool* return_no_ops_only = nullptr);
 
   // Fill in a StartTabletCopyRequest for the specified peer.
   // If that peer should not initiate Tablet Copy, returns a non-OK status.
diff --git a/src/kudu/consensus/multi_raft_batcher.cc 
b/src/kudu/consensus/multi_raft_batcher.cc
new file mode 100644
index 000000000..94e76539e
--- /dev/null
+++ b/src/kudu/consensus/multi_raft_batcher.cc
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/consensus/multi_raft_batcher.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex> // IWYU pragma: keep
+#include <ostream>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_consensus_data.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/rpc/periodic.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+class DnsResolver;
+
+namespace rpc {
+class Messenger;
+}  // namespace rpc
+}  // namespace kudu
+
+DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(raft_heartbeat_interval_ms);
+
+DEFINE_int32(multi_raft_heartbeat_window_ms, 100,
+  "The batch time window for heartbeat batching."
+  "For minimal delay and still maximum effectiveness set"
+  "multi_raft_heartbeat_window_ms = heartbeat_interval_ms * ("
+  "batch_size / {estimated follower peers from the same host}) + {a little 
tolerance};"
+  "however, a 0.1 * heartbeat_interval_ms is good, and there is no reason to"
+  "set it any lower."
+  "This value is also forced to be less than or equal to half of the heartbeat 
interval,"
+  "because it makes no sense to introduce a possible delay comparable to the 
heartbeat interval."
+);
+TAG_FLAG(multi_raft_heartbeat_window_ms, experimental);
+
+DEFINE_bool(enable_multi_raft_heartbeat_batcher,
+            false,
+            "Whether to enable the batching of raft heartbeats.");
+TAG_FLAG(enable_multi_raft_heartbeat_batcher, experimental);
+
+DEFINE_int32(multi_raft_batch_size, 30, "Maximum batch size for a multi-raft 
consensus payload.");
+TAG_FLAG(multi_raft_batch_size, experimental);
+DEFINE_validator(multi_raft_batch_size,
+                 [](const char* /*flagname*/, int32_t value) { return value > 
0; });
+
+namespace kudu {
+namespace consensus {
+
+using kudu::DnsResolver;
+using rpc::PeriodicTimer;
+
+uint64_t MultiRaftHeartbeatBatcher::Subscribe(const PeriodicHeartbeater& 
heartbeater) {
+  DCHECK(!closed_);
+  std::lock_guard l(heartbeater_lock_);
+  auto it = peers_.insert({next_id_, heartbeater});
+  DCHECK(it.second) << "Peer with id " << next_id_ << " already exists";
+  queue_.push_back(
+      {MonoTime::Now() + 
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms), next_id_});
+  return next_id_++;
+}
+
+void MultiRaftHeartbeatBatcher::Unsubscribe(uint64_t id) {
+  std::lock_guard l(heartbeater_lock_);
+  DCHECK_EQ(1, peers_.count(id));
+  peers_.erase(id);
+}
+
+MultiRaftHeartbeatBatcher::MultiRaftHeartbeatBatcher(
+    const kudu::HostPort& hostport,
+    DnsResolver* dns_resolver,
+    std::shared_ptr<kudu::rpc::Messenger> messenger,
+    MonoDelta flush_interval,
+    std::shared_ptr<ThreadPoolToken> raft_pool_token)
+    : messenger_(std::move(messenger)),
+      consensus_proxy_(std::make_unique<ConsensusServiceProxy>(messenger_, 
hostport, dns_resolver)),
+      batch_time_window_(flush_interval),
+      raft_pool_token_(std::move(raft_pool_token)),
+      closed_(false) {}
+
+void MultiRaftHeartbeatBatcher::StartTimer() {
+  std::weak_ptr<MultiRaftHeartbeatBatcher> const weak_peer = 
shared_from_this();
+  DCHECK_EQ(nullptr, heartbeat_timer_) << "Heartbeat timer started twice";
+  heartbeat_timer_ = PeriodicTimer::Create(
+      messenger_,
+      [weak_peer]() {
+        if (auto peer = weak_peer.lock()) {
+          peer->PrepareNextBatch();
+        }
+      },
+      batch_time_window_);
+  heartbeat_timer_->Start();
+}
+
+void MultiRaftHeartbeatBatcher::PrepareNextBatch() {
+  std::vector<PeriodicHeartbeater> current_calls;
+  current_calls.reserve(FLAGS_multi_raft_batch_size);
+
+  auto submit_callbacks = [&]() {
+    auto this_ptr = shared_from_this();
+    KUDU_WARN_NOT_OK(
+        raft_pool_token_->Submit([this_ptr, current_calls = 
std::move(current_calls)]() {
+          this_ptr->SendOutScheduled(current_calls);
+        }),
+        "Failed to submit multi-raft heartbeat batcher task");
+    current_calls.clear();
+    current_calls.reserve(FLAGS_multi_raft_batch_size);
+  };
+
+  const auto sending_period = 
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms);
+
+  std::lock_guard lock(heartbeater_lock_);
+
+  auto send_until = MonoTime::Now() + batch_time_window_;
+  if (closed_) {
+    return;  // Batcher is closed, raft_pool_token_ might be invalid.
+  }
+
+  while (!queue_.empty() && queue_.front().time <= send_until) {
+    auto front = queue_.front();
+    queue_.pop_front();
+    auto peer = FindOrNull(peers_, front.id);
+    if (peer == nullptr) {
+      continue;  // Peer was unsubscribed.
+    }
+    // TODO(martonka): MonoTime::Now() + sending_period would make much more 
sense,
+    // but this is consistent with the old timer behavior.
+    auto next_time = front.time + sending_period;
+    queue_.push_back({next_time, front.id});
+    current_calls.emplace_back(*peer);
+    if (current_calls.size() >= FLAGS_multi_raft_batch_size) {
+      submit_callbacks();
+    }
+  }
+  if (!current_calls.empty()) {
+    submit_callbacks();
+  }
+}
+
+void MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback(
+    std::shared_ptr<MultiRaftConsensusData> data) {
+  for (int i = 0; i < data->batch_req.consensus_requests_size(); i++) {
+    const auto* resp = data->batch_res.consensus_responses_size() > i
+                           ? &data->batch_res.consensus_responses(i)
+                           : nullptr;
+    data->response_callback_data[i](data->controller, data->batch_res, resp);
+  }
+}
+
+void MultiRaftHeartbeatBatcher::Shutdown() {
+  {
+    std::lock_guard lock(heartbeater_lock_);
+    if (closed_) {
+      return;  // Already closed.
+    }
+    closed_ = true;
+  }
+  if (heartbeat_timer_) {
+    heartbeat_timer_->Stop();
+    heartbeat_timer_.reset();
+  }
+}
+
+void MultiRaftHeartbeatBatcher::SendOutScheduled(
+    const std::vector<PeriodicHeartbeater>& scheduled_callbacks) {
+  // No need to hold the lock here, as we are not modifying the state of the 
batcher.
+  DCHECK_LT(0, scheduled_callbacks.size());
+  DCHECK_GE(FLAGS_multi_raft_batch_size, scheduled_callbacks.size());
+  auto data = 
std::make_shared<MultiRaftConsensusData>(scheduled_callbacks.size());
+
+  for (const auto& cb : scheduled_callbacks) {
+    cb(data.get());
+  }
+  // If all called heartbeaters already have another call in progress, or 
pending elements
+  // in its queue, we might end up with an empty batch.
+  if (data->batch_req.consensus_requests_size() == 0) {
+    return;
+  }
+
+  DCHECK(data->batch_req.IsInitialized());
+  VLOG(1) << "Sending BatchRequest with size: " << 
data->batch_req.consensus_requests_size();
+
+  
data->controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
+
+  // Copy data shared pointer to ensure that it remains valid
+  consensus_proxy_->MultiRaftUpdateConsensusAsync(
+      data->batch_req, &data->batch_res, &data->controller, [data, inst = 
shared_from_this()]() {
+        inst->MultiRaftUpdateHeartbeatResponseCallback(data);
+      });
+}
+
+namespace {
+MonoDelta GetTimeWindow() {
+  // We don't want to delay more than half of the heartbeat interval.
+  // Even a quarter of the interval starts to be questionable delay,
+  // but half is for sure too much.
+  auto flush_interval =
+      std::min(FLAGS_multi_raft_heartbeat_window_ms, 
FLAGS_consensus_rpc_timeout_ms / 2);
+  if (flush_interval != FLAGS_multi_raft_heartbeat_window_ms) {
+    LOG(ERROR) << "multi_raft_heartbeat_window_ms should not be more than "
+               << " consensus_rpc_timeout_ms / 2. , forcing 
multi_raft_heartbeat_window_ms = "
+               << flush_interval;
+  }
+  return MonoDelta::FromMilliseconds(flush_interval);
+}
+}  // namespace
+
+MultiRaftManager::MultiRaftManager(kudu::DnsResolver* dns_resolver,
+                                   const scoped_refptr<MetricEntity>& entity)
+  : dns_resolver_(dns_resolver),
+    batch_time_window_(GetTimeWindow()),
+    closed_(false)
+{}
+
+void MultiRaftManager::Init(const std::shared_ptr<rpc::Messenger>& messenger,
+                            ThreadPool* raft_pool) {
+  DCHECK(!closed_);
+  messenger_ = messenger;
+  raft_pool_ = raft_pool;
+}
+
+MultiRaftHeartbeatBatcherPtr MultiRaftManager::AddOrGetBatcher(
+    const kudu::consensus::RaftPeerPB& remote_peer_pb) {
+  if (!FLAGS_enable_multi_raft_heartbeat_batcher) {
+    return nullptr;  // Batching is disabled.
+  }
+
+  DCHECK(messenger_);
+  DCHECK(raft_pool_);
+  auto hostport = HostPortFromPB(remote_peer_pb.last_known_addr());
+  std::lock_guard lock(mutex_);
+  if (closed_) {
+    DCHECK(false) << "We are in shutdown phase but still adding a peer. Should 
never happen";
+    return nullptr;
+  }
+  MultiRaftHeartbeatBatcherPtr batcher;
+
+  // After taking the lock, check if there is already a batcher
+  // for the same remote host and return it.
+  // If we used to have replicas shared with this host, but they were all 
removed,
+  // the old batcher might be deleted already (so the weak ptr is expired).
+  auto res = FindOrNull(batchers_, hostport);
+  if (res && (batcher = res->batcher.lock())) {
+    return batcher;
+  }
+  // We still have the token for this host, even if the batcher was deleted.
+  auto raft_pool_token = res ? res->raft_pool_token
+                             : std::shared_ptr<ThreadPoolToken>(move(
+                                   
raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT)));
+  batcher = std::make_shared<MultiRaftHeartbeatBatcher>(
+      hostport, dns_resolver_, messenger_, batch_time_window_, 
raft_pool_token);
+  batcher->StartTimer();
+  batchers_.insert({hostport, BatcherAndPoolToken(batcher, raft_pool_token)});
+  return batcher;
+}
+
+void MultiRaftManager::Shutdown() {
+  std::unordered_map<HostPort, BatcherAndPoolToken, HostPortHasher> batchers;
+  {
+    std::lock_guard lock(mutex_);
+    batchers.swap(batchers_);
+  }
+  for (auto& entry : batchers) {
+    if (auto batcher = entry.second.batcher.lock()) {
+      batcher->Shutdown();
+    }
+    entry.second.raft_pool_token->Shutdown();
+  }
+  batchers_.clear();
+}
+
+}  // namespace consensus
+}  // namespace kudu
diff --git a/src/kudu/consensus/multi_raft_batcher.h 
b/src/kudu/consensus/multi_raft_batcher.h
new file mode 100644
index 000000000..b09406bee
--- /dev/null
+++ b/src/kudu/consensus/multi_raft_batcher.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <deque>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+
+namespace kudu {
+class DnsResolver;
+class MetricEntity;
+class ThreadPool;
+class ThreadPoolToken;
+
+namespace rpc {
+class Messenger;
+class PeriodicTimer;
+class RpcController;
+}  // namespace rpc
+
+namespace consensus {
+class BatchedNoOpConsensusResponsePB;
+class MultiRaftConsensusResponsePB;
+class RaftPeerPB;
+
+typedef std::unique_ptr<ConsensusServiceProxy> ConsensusServiceProxyPtr;
+
+using HeartbeatResponseCallback = std::function<void(const rpc::RpcController&,
+                                                     const 
MultiRaftConsensusResponsePB&,
+                                                     const 
BatchedNoOpConsensusResponsePB*)>;
+
+// - MultiRaftHeartbeatBatcher is responsible for batching the processing
+//  and sending of no-op heartbeats, saving cpu and network resources.
+// - Peers can request (using Subscribe) to be called back on a
+//   {raft_heartbeat_interval_ms +/- multi_raft_heartbeat_window_ms} interval,
+//   together with other peers that are connected to the same host.
+//   Average interval over a long time will be raft_heartbeat_interval_ms.
+// - This will allow for less context switching and a bit smaller network
+//   overhead, as some fields can be shared, and rpc calls can be batched.
+// - The subscribed peers will be called back with the a MultiRaftConsensusData
+//   object, and it is the responsibility of the called peer to:
+//     + Correctly append to the MultiRaftConsensusData object.
+//     + Fill the shared fields if needed.
+//     + If it would do anything with significant cpu usage (pending items in 
the queue),
+//       it should submit an other task to the raft pool, and return early, 
not to
+//       block the other peers in the same batch.
+struct MultiRaftConsensusData;
+
+class MultiRaftHeartbeatBatcher : public 
std::enable_shared_from_this<MultiRaftHeartbeatBatcher> {
+ public:
+  MultiRaftHeartbeatBatcher(const kudu::HostPort& hostport,
+                            ::kudu::DnsResolver* dns_resolver,
+                            std::shared_ptr<rpc::Messenger> messenger,
+                            MonoDelta flush_interval,
+                            std::shared_ptr<ThreadPoolToken> raft_pool_token);
+
+  ~MultiRaftHeartbeatBatcher() = default;
+
+  using PeriodicHeartbeater = std::function<void(MultiRaftConsensusData*)>;
+  // Subscribe for periodic callbacks batched with other peers.
+  // Returns a unique id for the peer, which can be used to unsubscribe.
+  uint64_t Subscribe(const PeriodicHeartbeater& heartbeater);
+  void Unsubscribe(uint64_t id);
+
+ private:
+  friend class MultiRaftManager;
+
+  // Collect the heartbeats due in the next flush interval.
+  // Submit a task to the Raft pool to send them in a single RPC call per 
batch.
+  void PrepareNextBatch();
+
+  void Shutdown();
+
+  void StartTimer();
+
+  void 
MultiRaftUpdateHeartbeatResponseCallback(std::shared_ptr<MultiRaftConsensusData>
 data);
+
+  void SendOutScheduled(const std::vector<PeriodicHeartbeater>& 
scheduled_callbacks);
+
+  std::shared_ptr<rpc::PeriodicTimer> heartbeat_timer_;
+  std::shared_ptr<rpc::Messenger> messenger_;
+
+  ConsensusServiceProxyPtr consensus_proxy_;
+
+  uint64_t next_id_ = 1;
+  std::unordered_map<int, PeriodicHeartbeater> peers_;
+  // Next required heartbeat time for a peer. queue_ is ordered by time.
+  // After calling a peer, push it to the end of the queue with its next due 
time.
+  // Since all callbacks share the same period (raft_heartbeat_interval_ms),
+  // a simple deque is sufficient and faster than a priority queue.
+  struct Callback {
+    MonoTime time; // Time for the next heartbeat for this peer.
+    uint64_t id; // id of the peer inside peers_.
+  };
+  std::deque<Callback> queue_;
+  // Protects queue_ and peers_.
+  // Peers might subscribe concurrently, and PrepareNextBatch also uses both of
+  // these members.
+  std::mutex heartbeater_lock_;
+
+  const MonoDelta batch_time_window_;
+  std::shared_ptr<ThreadPoolToken> raft_pool_token_;
+  bool closed_;
+};
+
+using MultiRaftHeartbeatBatcherPtr = 
std::shared_ptr<MultiRaftHeartbeatBatcher>;
+
+// MultiRaftManager is responsible for managing all MultiRaftHeartbeatBatchers
+// for a given TServer (utilizes a mapping between a HostPort and the 
corresponding batcher).
+// MultiRaftManager allows multiple peers to share the same batcher
+// if they are connected to the same remote host.
+class MultiRaftManager : public std::enable_shared_from_this<MultiRaftManager> 
{
+ public:
+  MultiRaftManager(kudu::DnsResolver* dns_resolver, const 
scoped_refptr<MetricEntity>& entity);
+  ~MultiRaftManager() = default;
+
+  void Init(const std::shared_ptr<rpc::Messenger>& messenger, ThreadPool* 
raft_pool);
+
+  void Shutdown();
+
+  MultiRaftHeartbeatBatcherPtr AddOrGetBatcher(const 
kudu::consensus::RaftPeerPB& remote_peer_pb);
+
+ private:
+  std::shared_ptr<rpc::Messenger> messenger_;
+
+  kudu::DnsResolver* dns_resolver_;
+
+  ThreadPool* raft_pool_ = nullptr;
+
+  // Protects raft_pool_ and batchers_ during
+  // concurrent calls of AddOrGetBatcher.
+  std::mutex mutex_;
+
+  // Uses a weak_ptr to allow deallocation of unused batchers once no more
+  // consensus peers use them (and to stop the periodic timer).
+  // The MultiRaftHeartbeatBatcher destructor might be called on the raft pool
+  // using the same token. It is not safe to destruct the token there, so we
+  // keep it alive separately. Keeping one unused token per decommissioned host
+  // until restart should not cause any problems.
+  // TODO (martonka): Create a periodic timer to clean up tokens for dead 
hosts.
+  // Unless the cluster goes through millions of servers without restarting 
this
+  // TServer, this is not a problem. However, running a periodic cleanup timer
+  // (once per hour/day/week is enough) would be nice.
+  struct BatcherAndPoolToken {
+    std::weak_ptr<MultiRaftHeartbeatBatcher> batcher;
+    std::shared_ptr<ThreadPoolToken> raft_pool_token;
+    BatcherAndPoolToken(std::shared_ptr<MultiRaftHeartbeatBatcher> b,
+                  std::shared_ptr<ThreadPoolToken> t)
+        : batcher(b), raft_pool_token(std::move(t)) {}
+  };
+  std::unordered_map<HostPort, BatcherAndPoolToken, HostPortHasher> batchers_;
+
+  const MonoDelta batch_time_window_;
+
+  bool closed_;
+};
+
+}  // namespace consensus
+}  // namespace kudu
diff --git a/src/kudu/consensus/multi_raft_consensus_data.h 
b/src/kudu/consensus/multi_raft_consensus_data.h
new file mode 100644
index 000000000..c63a7cf88
--- /dev/null
+++ b/src/kudu/consensus/multi_raft_consensus_data.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <vector>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/rpc/rpc_controller.h"
+
+namespace kudu {
+namespace consensus {
+
+// Callback to process a single heartbeat response from a MultiRaftConsensus 
RPC call.
+// MultiRaftConsensusResponsePB contains errors shared between the batched 
requests
+// and peer-specific errors are in the BatchedNoOpConsensusResponsePB.
+using HeartbeatResponseCallback = std::function<void(const rpc::RpcController&,
+                                                     const 
MultiRaftConsensusResponsePB&,
+                                                     const 
BatchedNoOpConsensusResponsePB*)>;
+
+// Data for a single multi-raft consensus batch.
+// batch_req and response_callback_data must have the same number of elements,
+// corresponding to the same heartbeaters in order.
+// The MultiRaftUpdateConsensus RPC call will fill batch_res while preserving 
that order.
+struct MultiRaftConsensusData {
+  MultiRaftConsensusRequestPB batch_req;
+  MultiRaftConsensusResponsePB batch_res;
+  // Since we send out multiple RPCs in parallel, each batch needs its own RPC 
controller.
+  rpc::RpcController controller;
+  // Callbacks for the individual heartbeaters.
+  std::vector<HeartbeatResponseCallback> response_callback_data;
+  explicit MultiRaftConsensusData(size_t expected_size) {
+    response_callback_data.reserve(expected_size);
+  }
+};
+
+inline BatchedNoOpConsensusRequestPB ToNoOpRequest(const ConsensusRequestPB& 
req) {
+  BatchedNoOpConsensusRequestPB res;
+  if (req.has_tablet_id()) {
+    res.set_tablet_id(req.tablet_id());
+  }
+  if (req.has_caller_term()) {
+    res.set_caller_term(req.caller_term());
+  }
+  if (req.has_preceding_id()) {
+    *res.mutable_preceding_id() = req.preceding_id();
+  }
+  if (req.has_committed_index()) {
+    res.set_committed_index(req.committed_index());
+  }
+  if (req.has_all_replicated_index()) {
+    res.set_all_replicated_index(req.all_replicated_index());
+  }
+  if (req.has_safe_timestamp()) {
+    res.set_safe_timestamp(req.safe_timestamp());
+  }
+  if (req.has_last_idx_appended_to_leader()) {
+    res.set_last_idx_appended_to_leader(req.last_idx_appended_to_leader());
+  }
+
+  return res;
+}
+
+}  // namespace consensus
+}  // namespace kudu
diff --git a/src/kudu/consensus/peer_manager.cc 
b/src/kudu/consensus/peer_manager.cc
index be43243a9..a7cd11588 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -27,6 +27,7 @@
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_batcher.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -46,6 +47,7 @@ PeerManager::PeerManager(string tablet_id,
                          string local_uuid,
                          PeerProxyFactory* peer_proxy_factory,
                          PeerMessageQueue* queue,
+                         consensus::MultiRaftManager* multi_raft_manager,
                          ThreadPoolToken* raft_pool_token,
                          scoped_refptr<log::Log> log)
     : tablet_id_(std::move(tablet_id)),
@@ -53,7 +55,8 @@ PeerManager::PeerManager(string tablet_id,
       peer_proxy_factory_(peer_proxy_factory),
       queue_(queue),
       raft_pool_token_(raft_pool_token),
-      log_(std::move(log)) {
+      log_(std::move(log)),
+      multi_raft_manager_(multi_raft_manager) {
 }
 
 PeerManager::~PeerManager() {
@@ -75,10 +78,15 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB& 
config) {
 
     VLOG(1) << GetLogPrefix() << "Adding remote peer. Peer: " << 
SecureShortDebugString(peer_pb);
     shared_ptr<Peer> remote_peer;
+    std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher = nullptr;
+    if (multi_raft_manager_) {
+      multi_raft_batcher = multi_raft_manager_->AddOrGetBatcher(peer_pb);
+    }
     Peer::NewRemotePeer(peer_pb,
                         tablet_id_,
                         local_uuid_,
                         queue_,
+                        multi_raft_batcher,
                         raft_pool_token_,
                         peer_proxy_factory_,
                         &remote_peer);
diff --git a/src/kudu/consensus/peer_manager.h 
b/src/kudu/consensus/peer_manager.h
index 7c2a9ee61..503eaf0c6 100644
--- a/src/kudu/consensus/peer_manager.h
+++ b/src/kudu/consensus/peer_manager.h
@@ -39,7 +39,7 @@ class Peer;
 class PeerMessageQueue;
 class PeerProxyFactory;
 class RaftConfigPB;
-
+class MultiRaftManager;
 // Manages the remote peers that pull data from the local queue and send 
updates to the
 // remote machines.
 class PeerManager {
@@ -50,6 +50,7 @@ class PeerManager {
               std::string local_uuid,
               PeerProxyFactory* peer_proxy_factory,
               PeerMessageQueue* queue,
+              MultiRaftManager* multi_raft_manager,
               ThreadPoolToken* raft_pool_token,
               scoped_refptr<log::Log> log);
 
@@ -80,6 +81,7 @@ class PeerManager {
   ThreadPoolToken* raft_pool_token_;
   scoped_refptr<log::Log> log_;
   PeersMap peers_;
+  MultiRaftManager* multi_raft_manager_;
   mutable simple_spinlock lock_;
 
   DISALLOW_COPY_AND_ASSIGN(PeerManager);
diff --git a/src/kudu/consensus/raft_consensus.cc 
b/src/kudu/consensus/raft_consensus.cc
index 9fcef781f..b39a653aa 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -237,6 +237,7 @@ Status RaftConsensus::Create(ConsensusOptions options,
 Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             unique_ptr<PeerProxyFactory> peer_proxy_factory,
                             scoped_refptr<log::Log> log,
+                            MultiRaftManager* multi_raft_manager,
                             unique_ptr<TimeManager> time_manager,
                             ConsensusRoundHandler* round_handler,
                             const scoped_refptr<MetricEntity>& metric_entity,
@@ -300,6 +301,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& 
info,
                                                        peer_uuid(),
                                                        
peer_proxy_factory_.get(),
                                                        queue.get(),
+                                                       multi_raft_manager,
                                                        raft_pool_token_.get(),
                                                        log_));
   unique_ptr<PendingRounds> pending(new PendingRounds(
diff --git a/src/kudu/consensus/raft_consensus.h 
b/src/kudu/consensus/raft_consensus.h
index 921d9210d..27579aea5 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -67,6 +67,7 @@ namespace consensus {
 class ConsensusMetadataManager;
 class ConsensusRound;
 class ConsensusRoundHandler;
+class MultiRaftManager;
 class PeerManager;
 class PeerProxyFactory;
 class PendingRounds;
@@ -163,6 +164,7 @@ class RaftConsensus : public 
std::enable_shared_from_this<RaftConsensus>,
   Status Start(const ConsensusBootstrapInfo& info,
                std::unique_ptr<PeerProxyFactory> peer_proxy_factory,
                scoped_refptr<log::Log> log,
+               MultiRaftManager* multi_raft_manager,
                std::unique_ptr<TimeManager> time_manager,
                ConsensusRoundHandler* round_handler,
                const scoped_refptr<MetricEntity>& metric_entity,
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index b2eb93465..51d16958e 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -238,6 +238,7 @@ class RaftConsensusQuorumTest : public KuduTest {
           boot_info,
           std::move(proxy_factory),
           logs_[i],
+          nullptr,
           std::move(time_manager),
           op_factories_.back().get(),
           metric_entity_,
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index abff6e565..8a19829ec 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -536,6 +536,7 @@ Status SysCatalogTable::SetupTablet(
       master_->messenger(),
       /*result_tracker*/nullptr,
       log,
+      nullptr,
       master_->tablet_prepare_pool(),
       master_->dns_resolver()), "failed to start system catalog replica");
 
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc 
b/src/kudu/tablet/tablet_replica-test-base.cc
index d522949eb..cd75b1022 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -20,12 +20,14 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <utility>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
@@ -43,7 +45,6 @@
 #include "kudu/tablet/ops/op.h"
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet-test-util.h"
-#include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -53,6 +54,12 @@
 #include "kudu/util/pb_util.h"
 #include "kudu/util/test_macros.h"
 
+namespace kudu {
+namespace tablet {
+class Tablet;
+} // namespace tablet
+} // namespace kudu
+
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::consensus::ConsensusMetadata;
 using kudu::consensus::ConsensusMetadataManager;
@@ -178,6 +185,7 @@ Status TabletReplicaTestBase::StartReplica(const 
ConsensusBootstrapInfo& info) {
                                 messenger_,
                                 scoped_refptr<ResultTracker>(),
                                 log,
+                                nullptr,
                                 prepare_pool_.get(),
                                 dns_resolver_.get());
 }
@@ -223,6 +231,7 @@ Status TabletReplicaTestBase::RestartReplica(bool 
reset_tablet) {
                                        messenger_,
                                        scoped_refptr<ResultTracker>(),
                                        log,
+                                       nullptr,
                                        prepare_pool_.get(),
                                        dns_resolver_.get()));
   // Wait for the replica to be usable.
diff --git a/src/kudu/tablet/tablet_replica.cc 
b/src/kudu/tablet/tablet_replica.cc
index e393dab8c..fedea2f2e 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -226,6 +226,7 @@ Status TabletReplica::Start(
     shared_ptr<Messenger> messenger,
     scoped_refptr<ResultTracker> result_tracker,
     scoped_refptr<Log> log,
+    consensus::MultiRaftManager* multi_raft_manager,
     ThreadPool* prepare_pool,
     DnsResolver* resolver) {
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
@@ -292,6 +293,7 @@ Status TabletReplica::Start(
         bootstrap_info,
         std::move(peer_proxy_factory),
         log,
+        multi_raft_manager,
         std::move(time_manager),
         this,
         metric_entity,
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index dd1ffb02f..95212978f 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -65,6 +65,7 @@ class TxnOpDispatcherITest_PreliminaryTasksTimeout_Test;
 
 namespace consensus {
 class ConsensusMetadataManager;
+class MultiRaftManager;
 class OpStatusPB;
 class TimeManager;
 }
@@ -130,6 +131,7 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
                std::shared_ptr<rpc::Messenger> messenger,
                scoped_refptr<rpc::ResultTracker> result_tracker,
                scoped_refptr<log::Log> log,
+               consensus::MultiRaftManager* multi_raft_manager,
                ThreadPool* prepare_pool,
                DnsResolver* resolver);
 
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc 
b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 614741c07..115a98e23 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -187,6 +187,7 @@ class TabletCopyTest : public KuduTabletTest,
                                      messenger,
                                      scoped_refptr<rpc::ResultTracker>(),
                                      log,
+                                     nullptr,
                                      prepare_pool_.get(),
                                      dns_resolver_.get()));
     
ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 8de6e6a6b..07d00ccf9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -28,6 +28,7 @@
 #include <string>
 #include <type_traits>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -207,6 +208,8 @@ using kudu::consensus::GetNodeInstanceResponsePB;
 using kudu::consensus::LeaderStepDownMode;
 using kudu::consensus::LeaderStepDownRequestPB;
 using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::MultiRaftConsensusRequestPB;
+using kudu::consensus::MultiRaftConsensusResponsePB;
 using kudu::consensus::OpId;
 using kudu::consensus::RaftConsensus;
 using kudu::consensus::RaftPeerPB;
@@ -261,6 +264,25 @@ extern const char* CFILE_CACHE_MISS_BYTES_METRIC_NAME;
 extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME;
 }
 
+namespace {
+  consensus::ConsensusRequestPB ToSingleRequest(
+    const consensus::MultiRaftConsensusRequestPB& parent_req,
+    const consensus::BatchedNoOpConsensusRequestPB& req) {
+    consensus::ConsensusRequestPB result;
+
+    result.set_dest_uuid(parent_req.dest_uuid());
+    result.set_caller_uuid(parent_req.caller_uuid());
+    result.set_tablet_id(req.tablet_id());
+    result.set_caller_term(req.caller_term());
+    *result.mutable_preceding_id() = req.preceding_id();
+    result.set_committed_index(req.committed_index());
+    result.set_all_replicated_index(req.all_replicated_index());
+    result.set_safe_timestamp(req.safe_timestamp());
+    result.set_last_idx_appended_to_leader(req.last_idx_appended_to_leader());
+
+    return result;
+  }
+} // namespace
 namespace tserver {
 
 const char* SCANNER_BYTES_READ_METRIC_NAME = "scanner_bytes_read";
@@ -294,24 +316,31 @@ bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf* 
tablet_manager,
   return true;
 }
 
-template<class RespClass>
-void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
-                             TabletStatePB tablet_state,
-                             RespClass* resp,
-                             RpcContext* context) {
+std::pair<Status, TabletServerErrorPB::Code> GetTabletNotRunningCode(
+  const scoped_refptr<TabletReplica>& replica,
+  const TabletStatePB& tablet_state
+  ) {
   Status s = Status::IllegalState("Tablet not RUNNING",
                                   tablet::TabletStatePB_Name(tablet_state));
-  auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
   if (replica->tablet_metadata()->tablet_data_state() == 
TABLET_DATA_TOMBSTONED ||
       replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) {
     // Treat tombstoned tablets as if they don't exist for most purposes.
     // This takes precedence over failed, since we don't reset the failed
     // status of a TabletReplica when deleting it. Only tablet copy does that.
-    error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
+    return {s, TabletServerErrorPB::TABLET_NOT_FOUND};
   } else if (tablet_state == tablet::FAILED) {
     s = s.CloneAndAppend(replica->error().ToString());
-    error_code = TabletServerErrorPB::TABLET_FAILED;
+    return {s, TabletServerErrorPB::TABLET_FAILED};
   }
+  return {s, TabletServerErrorPB::TABLET_NOT_RUNNING};
+}
+
+template<class RespClass>
+void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
+                             TabletStatePB tablet_state,
+                             RespClass* resp,
+                             RpcContext* context) {
+  auto [s, error_code] = GetTabletNotRunningCode(replica, tablet_state);
   SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
 }
 
@@ -1777,6 +1806,67 @@ void ConsensusServiceImpl::UpdateConsensus(const 
ConsensusRequestPB* req,
   context->RespondSuccess();
 }
 
+void ConsensusServiceImpl::MultiRaftUpdateConsensus(
+    const MultiRaftConsensusRequestPB* req,
+    MultiRaftConsensusResponsePB* resp,
+    RpcContext* context) {
+  DVLOG(3) << "Received Batched Consensus Update RPC: " << 
SecureDebugString(*req);
+  if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, 
context)) {
+    return;
+  }
+  resp->set_responder_uuid(tablet_manager_->NodeInstance().permanent_uuid());
+  for (const auto& single_req : req->consensus_requests()) {
+    auto* single_resp = resp->add_consensus_responses();
+    auto set_error = [single_resp](const Status& s, const 
TabletServerErrorPB::Code& error_code) {
+      auto error = single_resp->mutable_error();
+      StatusToPB(s, error->mutable_status());
+      error->set_code(error_code);
+    };
+    scoped_refptr<TabletReplica> replica;
+    Status s = tablet_manager_->GetTabletReplica(single_req.tablet_id(), 
&replica);
+    if (PREDICT_FALSE(!s.ok())) {
+      set_error(s, TabletServerErrorPB::TABLET_NOT_FOUND);
+      continue; // move to next request;
+    }
+
+    const auto& state = replica->state();
+    if (PREDICT_FALSE(state != tablet::RUNNING)) {
+      auto [s, error_code] = GetTabletNotRunningCode(replica, state);
+      set_error(s, error_code);
+      continue;
+    }
+    shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
+
+    if (!consensus) {
+      set_error(Status::ServiceUnavailable("Raft Consensus unavailable",
+                                           "Tablet replica not initialized"),
+                TabletServerErrorPB::TABLET_NOT_RUNNING);
+      continue;
+    }
+
+    const auto req2 = ToSingleRequest(*req, single_req);
+    auto resp2 = ConsensusResponsePB();
+    // TODO(martonka): We know that this is a no-op. So maybe we could handle 
it
+    // in a more efficient way.
+    s = consensus->Update(&req2, &resp2);
+    if (PREDICT_FALSE(!s.ok())) {
+      // Clear the response first, since a partially-filled response could
+      // result in confusing a caller, or in having missing required fields
+      // in embedded optional messages.
+      single_resp->Clear();
+      set_error(s, TabletServerErrorPB::UNKNOWN_ERROR);
+      continue;
+    }
+    single_resp->set_responder_term(resp2.responder_term());
+    *single_resp->mutable_status() = resp2.status();
+    single_resp->set_server_quiescing(resp2.server_quiescing());
+    if (resp2.has_error()) {
+      *single_resp->mutable_error() = resp2.error();
+    }
+  }
+  context->RespondSuccess();
+}
+
 void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
                                                 VoteResponsePB* resp,
                                                 RpcContext* context) {
diff --git a/src/kudu/tserver/tablet_service.h 
b/src/kudu/tserver/tablet_service.h
index ba482b9bc..2765fd371 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -52,6 +52,8 @@ class ChangeConfigRequestPB;
 class ChangeConfigResponsePB;
 class ConsensusRequestPB;
 class ConsensusResponsePB;
+class MultiRaftConsensusRequestPB;
+class MultiRaftConsensusResponsePB;
 class GetConsensusStateRequestPB;
 class GetConsensusStateResponsePB;
 class GetLastOpIdRequestPB;
@@ -256,6 +258,11 @@ class ConsensusServiceImpl : public 
consensus::ConsensusServiceIf {
                        consensus::ConsensusResponsePB* resp,
                        rpc::RpcContext* context) override;
 
+  void MultiRaftUpdateConsensus(
+      const consensus::MultiRaftConsensusRequestPB* req,
+      consensus::MultiRaftConsensusResponsePB* resp,
+      rpc::RpcContext* context) override;
+
   void RequestConsensusVote(const consensus::VoteRequestPB* req,
                             consensus::VoteResponsePB* resp,
                             rpc::RpcContext* context) override;
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index f7f3269c1..f4af2c99b 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -43,6 +43,7 @@
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_batcher.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/quorum_util.h"
@@ -346,7 +347,10 @@ TSTabletManager::TSTabletManager(TabletServer* server)
     shutdown_latch_(1),
     metric_registry_(server->metric_registry()),
     tablet_copy_metrics_(server->metric_entity()),
-    state_(MANAGER_INITIALIZING) {
+    state_(MANAGER_INITIALIZING),
+    multi_raft_manager_(std::make_unique<consensus::MultiRaftManager>(
+      server_->dns_resolver(), server_->metric_entity())) {
+
   // A heartbeat msg without statistics will be considered to be from an old
   // version, thus it's necessary to trigger updating stats as soon as 
possible.
   next_update_time_ = MonoTime::Now();
@@ -448,6 +452,8 @@ Status TSTabletManager::Init(Timer* start_tablets,
                              std::atomic<int>* tablets_total) {
   CHECK_EQ(state(), MANAGER_INITIALIZING);
 
+  multi_raft_manager_->Init(server_->messenger(), server_->raft_pool());
+
   // Start the tablet copy thread pool. We set a max queue size of 0 so that if
   // the number of requests exceeds the number of threads, a
   // SERVICE_UNAVAILABLE error may be returned to the remote caller.
@@ -1433,6 +1439,7 @@ void TSTabletManager::OpenTablet(const 
scoped_refptr<tablet::TabletReplica>& rep
                        server_->messenger(),
                        server_->result_tracker(),
                        log,
+                       multi_raft_manager_.get(),
                        server_->tablet_prepare_pool(),
                        server_->dns_resolver());
     if (!s.ok()) {
@@ -1535,6 +1542,10 @@ void TSTabletManager::Shutdown() {
     txn_status_manager_pool_->Shutdown();
   }
 
+  if (multi_raft_manager_ != nullptr) {
+    multi_raft_manager_->Shutdown();
+  }
+
   // Take a snapshot of the replicas list -- that way we don't have to hold
   // on to the lock while shutting them down, which might cause a lock
   // inversion. (see KUDU-308 for example).
diff --git a/src/kudu/tserver/ts_tablet_manager.h 
b/src/kudu/tserver/ts_tablet_manager.h
index 289ed0108..7597928b0 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -66,6 +66,7 @@ namespace consensus {
 class ConsensusMetadataManager;
 class OpId;
 class StartTabletCopyRequestPB;
+class MultiRaftManager;
 } // namespace consensus
 
 namespace master {
@@ -504,6 +505,8 @@ class TSTabletManager : public 
tserver::TabletReplicaLookupIf {
   scoped_refptr<Histogram> create_tablet_run_time_;
   scoped_refptr<Histogram> delete_tablet_run_time_;
 
+  std::unique_ptr<consensus::MultiRaftManager> multi_raft_manager_;
+
   DISALLOW_COPY_AND_ASSIGN(TSTabletManager);
 };
 


Reply via email to