This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 4999c30d1 KUDU-1973: Send no-op heartbeat operations batched - PART 1
4999c30d1 is described below
commit 4999c30d1fc04123d19258ebda5478303d8e6644
Author: Zoltan Martonka <[email protected]>
AuthorDate: Fri Aug 22 15:25:26 2025 +0000
KUDU-1973: Send no-op heartbeat operations batched - PART 1
Due to periodically sent heartbeat messages, a Kudu cluster
with thousands of tablets still consumes significant CPU and
network resources, even without any user activity. When
multiple messages are sent to the same host within a short
time frame, they can be batched to reduce CPU usage.
This results in fewer RPC calls, and some fields can also be
shared between the no-op messages, further reducing network
traffic.
Processing heartbeats for the same host together also reduces
the CPU load. We only process the periodic heartbeats and
still send the leadership no-op heartbeats unbatched, in a
separate task. Batching only the periodic no-op heartbeats
allows us to process the batch request on a single thread,
since an empty periodic update does not take much time.
The shared timer submits a single shared task for the
heartbeat senders that should beat. However, if it encounters
a consensus peer with actual messages, it leaves it out and
submits a separate task on the RPC threads using the
consensus peer's RPC token. This ensures that the other no-ops
in that batch will be sent out in a timely manner.
Using this method instead of buffering and periodically
flushing the buffer has two significant advantages:
+ If X heartbeats are batched together, instead of X timer
calls and X further task submissions to RPC threads, we will
have 1 timer call and [1, X+1] callbacks on the RPC thread -
usually just 1-2 callbacks.
+ Since there is no buffering, when a write request arrives,
there is no need for flush/discard logic.
Note that if some of the responses in the batch trigger
further updates, ProcessResponse calls DoProcessResponse on a
separate thread. So processing the response on a single thread
is fine as well; there is no need to change that logic.
Measurement:
I started a 1 master + 4 TS setup using t3.xlarge instances.
Created 200 tables with 10 hash partitions (RF=3 and 1,000
initial rows). Then performed the following random sampling:
For 1 tablet server (since the tablets are newly created, they
are distributed evenly):
+ Measure metrics for 40 seconds.
+ Also call:
sudo perf stat -e sched:sched_* -p <tserver-pid> sleep 40
before starting the write workload.
+ Right after the start of the 40-second window, start
0/3/6/.../24 separate write tasks (from 5 different VMs),
each writing 1 million rows into a random table.
(Randomly selected, so the sort term history of load is
"the same" for any value after the first few measurements).
I started the cluster with 3 configurations (always starting
with a fresh cluster):
1. Batching off
2. Batching on, max batch size = 10
3. Batching on, max batch size = 30
The results were the following (execution time of the
binary writing the rows using kudu_client.so):
========== write count: 0:==========
Runs with batching off: 15, 10: 19, 30: 18
Off client task runtimes avg: n/a, min n/a, max n/a, med n/a
10 client task runtimes avg: n/a, min n/a, max n/a, med n/a
30 client task runtimes avg: n/a, min n/a, max n/a, med n/a
Change (10 & 30): n/a% n/a%
Off cpu_stime avg: 13568.533333333333, min 9419, max 16947, med 13836
10 cpu_stime avg: 6807.368421052632, min 4772, max 8788, med 6899
30 cpu_stime avg: 5833.722222222223, min 3279, max 7128, med 6045.0
Change (10 & 30): -49.829740224544295% -57.005506203896594%
Off cpu_utime avg: 40332.86666666667, min 19340, max 50660, med 46046
10 cpu_utime avg: 28121.105263157893, min 8846, max 41993, med 27630
30 cpu_utime avg: 26349.055555555555, min 9592, max 39603, med 26169.0
Change (10 & 30): -30.277444706406786% -34.67100721275563%
Off rpc_incoming_queue_time avg: 85605.4, min 85252, max 85767, med
85666
10 rpc_incoming_queue_time avg: 8997.263157894737, min 8946, max 9031,
med 8997
30 rpc_incoming_queue_time avg: 3497.0555555555557, min 3470, max
3532, med 3496.5
Change (10 & 30): -89.4898415778739% -95.91491242894075%
Off utime+stime avg: 53901.4, min 32665, max 65281, med 56304
10 utime+stime avg: 34928.47368421053, min 14796, max 49508, med 35016
30 utime+stime avg: 32182.777777777777, min 15019, max 46496, med
31922.0
Change (10 & 30): -35.19932008405993% -40.29324325939998%
Off stat_runtime avg: 54531223159.181816, min 31976380571, max
65955396580, med 63846401240
10 stat_runtime avg: 32422918329.933334, min 15178573784, max
49774116201, med 31081119380
30 stat_runtime avg: 31854018849.733334, min 15354712860, max
46880459417, med 33664261243
Change (10 & 30): -40.5424700720764% -41.58572464668833%
Off switch avg: 642389.0, min 623210, max 653324, med 649967
10 switch avg: 261494.33333333334, min 219716, max 270616, med 264348
30 switch avg: 249353.33333333334, min 247260, max 252089, med 249239
Change (10 & 30): -59.29346029690213% -61.18343661965984%
========== write count: 3:==========
Runs with batching off: 28, 10: 23, 30: 13
Off client task runtimes avg: 6.203379134337108, min 5.632460594177246,
max 6.844575881958008, med 6.185922980308533
10 client task runtimes avg: 6.370108393655307, min 5.922394752502441,
max 7.963289260864258, med 6.355126142501831
30 client task runtimes avg: 6.1353193796598, min 5.597020149230957,
max 7.383537769317627, med 6.066960334777832
Change (10 & 30): 2.6877167380487066% -1.097140013586817%
Off cpu_stime avg: 14144.035714285714, min 9547, max 16631, med 15466.5
10 cpu_stime avg: 7969.0, min 4657, max 10297, med 8943
30 cpu_stime avg: 5950.0, min 3908, max 8000, med 6528
Change (10 & 30): -43.6582305009936% -57.93279852941547%
Off cpu_utime avg: 43743.46428571428, min 28862, max 54178, med 44754.5
10 cpu_utime avg: 42348.434782608696, min 17163, max 47387, med 44822
30 cpu_utime avg: 32516.30769230769, min 14061, max 44001, med 35589
Change (10 & 30): -3.1891152790136323% -25.665906385638394%
Off rpc_incoming_queue_time avg: 89446.75, min 88955, max 89839, med
89493.5
10 rpc_incoming_queue_time avg: 13017.217391304348, min 12267, max
13414, med 13063
30 rpc_incoming_queue_time avg: 7832.2307692307695, min 7498, max
8057, med 7876
Change (10 & 30): -85.44696437678915% -91.24369441122145%
Off utime+stime avg: 57887.5, min 40922, max 70702, med 58746.5
10 utime+stime avg: 50317.434782608696, min 24283, max 56899, med 53239
30 utime+stime avg: 38466.307692307695, min 20589, max 50935, med 43039
Change (10 & 30): -13.077201843906373% -33.54988954038834%
Off stat_runtime avg: 57709248396.1579, min 42352963849, max
69618765193, med 55313027027
10 stat_runtime avg: 49429512557.3125, min 20883553890, max
54484284473, med 52754845564.0
30 stat_runtime avg: 38284725561.85714, min 20290863097, max
50310539913, med 41356308570
Change (10 & 30): -14.347329187182133% -33.65928923724118%
Off switch avg: 673106.2631578947, min 661281, max 691078, med 673573
10 switch avg: 291551.25, min 275920, max 297571, med 293681.5
30 switch avg: 272828.85714285716, min 268207, max 278893, med 272300
Change (10 & 30): -56.685702398284036% -59.46719380341614%
========== write count: 6:==========
Runs with batching off: 28, 10: 25, 30: 13
Off client task runtimes avg: 6.88737925745192, min 6.060053825378418,
max 8.26358151435852, med 6.81439471244812
10 client task runtimes avg: 6.90557310740153, min 6.297971725463867,
max 8.041281938552856, med 6.896089553833008
30 client task runtimes avg: 6.4807085410142555, min
6.009506940841675, max 6.913873195648193, med 6.4735718965530396
Change (10 & 30): 0.26416216197073794% -5.904578523066817%
Off cpu_stime avg: 13949.535714285714, min 10154, max 18245, med 11736.5
10 cpu_stime avg: 7253.72, min 5144, max 10370, med 6352
30 cpu_stime avg: 6419.0, min 4378, max 9172, med 5325
Change (10 & 30): -48.00027650689859% -53.98413157631974%
Off cpu_utime avg: 50218.892857142855, min 27029, max 59428, med 54704.0
10 cpu_utime avg: 43645.04, min 28473, max 52156, med 47802
30 cpu_utime avg: 38955.230769230766, min 19804, max 46717, med 38318
Change (10 & 30): -13.09039782267487% -22.429132637299887%
Off rpc_incoming_queue_time avg: 93353.53571428571, min 92648, max
93824, med 93383.5
10 rpc_incoming_queue_time avg: 16530.72, min 10724, max 17280, med
16793
30 rpc_incoming_queue_time avg: 11754.0, min 11185, max 12380, med
11716
Change (10 & 30): -82.29234717944342% -87.40915391145565%
Off utime+stime avg: 64168.42857142857, min 37183, max 77202, med
66883.0
10 utime+stime avg: 50898.76, min 34228, max 60366, med 54281
30 utime+stime avg: 45374.230769230766, min 27342, max 54821, med 45185
Change (10 & 30): -20.679435147235292% -29.288854691645128%
Off stat_runtime avg: 67253597832.07692, min 44608096874, max
72681918723, med 69457593188
10 stat_runtime avg: 49455203107.55556, min 34066502223, max
57746660961, med 52328569829
30 stat_runtime avg: 44232893390.8, min 24621447550, max 53333232716,
med 46963805079
Change (10 & 30): -26.46459862111992% -34.22969950062223%
Off switch avg: 694087.7692307692, min 663129, max 713254, med 696097
10 switch avg: 311993.22222222225, min 279392, max 331949, med 315909
30 switch avg: 294191.6, min 284201, max 301534, med 298264
Change (10 & 30): -55.04988906979411% -57.61463995741616%
========== write count: 9:==========
Runs with batching off: 12, 10: 28, 30: 11
Off client task runtimes avg: 8.548090826582026, min 7.529751539230347,
max 9.48080325126648, med 8.539497256278992
10 client task runtimes avg: 8.2908557114147, min 7.11676287651062,
max 11.546950578689575, med 8.205906629562378
30 client task runtimes avg: 7.6591967903837865, min
6.522926092147827, max 8.579655647277832, med 7.697844862937927
Change (10 & 30): -3.009269793523961% -10.398743464845307%
Off cpu_stime avg: 15005.166666666666, min 11661, max 19840, med 12412.0
10 cpu_stime avg: 8288.464285714286, min 5129, max 12050, med 7104.5
30 cpu_stime avg: 6007.545454545455, min 4793, max 9926, med 5750
Change (10 & 30): -44.76259764493816% -59.96348732406312%
Off cpu_utime avg: 59056.666666666664, min 56031, max 63356, med 59377.0
10 cpu_utime avg: 49433.57142857143, min 26090, max 57382, med 52271.5
30 cpu_utime avg: 44772.818181818184, min 27573, max 51210, med 50317
Change (10 & 30): -16.294680653770786% -24.186682539112404%
Off rpc_incoming_queue_time avg: 96753.83333333333, min 96291, max
97393, med 96774.5
10 rpc_incoming_queue_time avg: 21073.464285714286, min 20380, max
21682, med 21082.0
30 rpc_incoming_queue_time avg: 16262.272727272728, min 15920, max
16578, med 16283
Change (10 & 30): -78.21950453052064% -83.19211532296974%
Off utime+stime avg: 74061.83333333333, min 67692, max 79069, med
73660.0
10 utime+stime avg: 57722.03571428572, min 31219, max 68704, med
61430.5
30 utime+stime avg: 50780.36363636364, min 32366, max 60608, med 56080
Change (10 & 30): -22.06237259278523% -31.435178754198212%
Off stat_runtime avg: 74426347984.6, min 69044550939, max 76319774349,
med 75772265736
10 stat_runtime avg: 60146731951.2, min 56539088994, max 66356993395,
med 60227311137.5
30 stat_runtime avg: 58533466007.0, min 58533466007, max 58533466007,
med 58533466007
Change (10 & 30): -19.18623769683646% -21.353838268254812%
Off switch avg: 735295.8, min 709459, max 746328, med 741901
10 switch avg: 339284.5, min 316234, max 365507, med 333199.5
30 switch avg: 323555.0, min 323555, max 323555, med 323555
Change (10 & 30): -53.857413574237746% -55.99662067973189%
========== write count: 12:==========
Runs with batching off: 28, 10: 26, 30: 19
Off client task runtimes avg: 10.253776160994573, min
9.013848304748535, max 11.68369174003601, med 10.17331576347351
10 client task runtimes avg: 10.120543375229223, min 8.6819486618042,
max 12.503127098083496, med 9.816626071929932
30 client task runtimes avg: 9.125201345535746, min 6.506812334060669,
max 10.293994665145874, med 9.27278220653534
Change (10 & 30): -1.2993533667349566% -11.006431169737564%
Off cpu_stime avg: 14449.607142857143, min 11600, max 21255, med 12862.5
10 cpu_stime avg: 8505.76923076923, min 6634, max 12915, med 7722.0
30 cpu_stime avg: 7201.578947368421, min 5122, max 11262, med 6769
Change (10 & 30): -41.13494473118685% -50.16072841171763%
Off cpu_utime avg: 61044.82142857143, min 44149, max 66881, med 62788.5
10 cpu_utime avg: 56311.92307692308, min 44619, max 64298, med 56596.5
30 cpu_utime avg: 50936.84210526316, min 27324, max 58087, med 54226
Change (10 & 30): -7.75315291434887% -16.558291246925204%
Off rpc_incoming_queue_time avg: 100650.96428571429, min 99736, max
101318, med 100699.5
10 rpc_incoming_queue_time avg: 24776.884615384617, min 22855, max
25639, med 24807.5
30 rpc_incoming_queue_time avg: 20277.105263157893, min 19619, max
21074, med 20298
Change (10 & 30): -75.38336091341226% -79.8540377560636%
Off utime+stime avg: 75494.42857142857, min 55942, max 86122, med
76436.0
10 utime+stime avg: 64817.692307692305, min 51253, max 74834, med
64551.0
30 utime+stime avg: 58138.42105263158, min 32446, max 65996, med 61492
Change (10 & 30): -14.14241615675591% -22.989785931521702%
Off stat_runtime avg: 76728076228.42857, min 66549332918, max
82294242231, med 80703935110
10 stat_runtime avg: 62724548520.2, min 49469140539, max 71325822979,
med 64903018497
30 stat_runtime avg: 54956350974.333336, min 45573244914, max
62202256097, med 57093551912
Change (10 & 30): -18.250852095572444% -28.375174153041748%
Off switch avg: 769651.1428571428, min 741196, max 783936, med 777328
10 switch avg: 364841.8, min 324393, max 403493, med 361469
30 switch avg: 342820.0, min 312424, max 363162, med 352874
Change (10 & 30): -52.5964713512133% -55.45774170783868%
========== write count: 15:==========
Runs with batching off: 29, 10: 25, 30: 11
Off client task runtimes avg: 12.093589447937717, min
9.694162607192993, max 13.974745273590088, med 11.848691463470459
10 client task runtimes avg: 12.023993889490763, min 9.89328384399414,
max 14.774570941925049, med 11.77902603149414
30 client task runtimes avg: 10.500564222624808, min
6.7602620124816895, max 12.283529281616211, med 10.966847896575928
Change (10 & 30): -0.5754747897351642% -13.172476477482565%
Off cpu_stime avg: 15818.413793103447, min 12330, max 22206, med 13621
10 cpu_stime avg: 10410.12, min 7513, max 14940, med 8576
30 cpu_stime avg: 8565.09090909091, min 7139, max 12624, med 7520
Change (10 & 30): -34.18986166275009% -45.853667623582204%
Off cpu_utime avg: 64650.724137931036, min 42791, max 69649, med 66646
10 cpu_utime avg: 60900.88, min 47295, max 68286, med 61477
30 cpu_utime avg: 57182.27272727273, min 48519, max 60889, med 57494
Change (10 & 30): -5.8001579842026585% -11.551999626059118%
Off rpc_incoming_queue_time avg: 104149.72413793103, min 102866, max
105221, med 104088
10 rpc_incoming_queue_time avg: 28739.68, min 27077, max 30125, med
28748
30 rpc_incoming_queue_time avg: 24547.454545454544, min 23686, max
25125, med 24814
Change (10 & 30): -72.40541898897541% -76.43061011573585%
Off utime+stime avg: 80469.13793103448, min 61847, max 90811, med 80286
10 utime+stime avg: 71311.0, min 60967, max 82608, med 71005
30 utime+stime avg: 65747.36363636363, min 60394, max 69941, med 66757
Change (10 & 30): -11.380932077193862% -18.294932284832033%
Off stat_runtime avg: 81980607740.77777, min 55653565554, max
87411163108, med 85442439087
10 stat_runtime avg: 67729727090.3, min 55481286826, max 78981283495,
med 67728369751.0
30 stat_runtime avg: 58272593883.666664, min 51761458017, max
64954871741, med 58101451893
Change (10 & 30): -17.383233722222425% -28.91905111520485%
Off switch avg: 806852.8888888889, min 771841, max 826717, med 809368
10 switch avg: 405952.4, min 369202, max 436256, med 413122.0
30 switch avg: 357860.3333333333, min 340666, max 367513, med 365402
Change (10 & 30): -49.686937285552254% -55.64738773803734%
========== write count: 18:==========
Runs with batching off: 17, 10: 25, 30: 14
Off client task runtimes avg: 14.400622643676458, min
12.48328423500061, max 17.490095376968384, med 14.047804951667786
10 client task runtimes avg: 13.62668704509735, min
11.661161422729492, max 15.913615226745605, med 13.4107586145401
30 client task runtimes avg: 12.081949627588665, min
6.308183908462524, max 13.923750400543213, med 12.532055974006653
Change (10 & 30): -5.374320386896292% -16.101199742956663%
Off cpu_stime avg: 16935.58823529412, min 13577, max 22825, med 14263
10 cpu_stime avg: 11495.4, min 8416, max 16228, med 9082
30 cpu_stime avg: 9291.07142857143, min 7162, max 14057, med 8078.5
Change (10 & 30): -32.12281829075564% -45.13877345453734%
Off cpu_utime avg: 70929.58823529411, min 67674, max 76405, med 70892
10 cpu_utime avg: 64712.68, min 52765, max 72801, med 65688
30 cpu_utime avg: 58736.642857142855, min 45035, max 64122, med 59485.0
Change (10 & 30): -8.764901065928676% -17.190210293768672%
Off rpc_incoming_queue_time avg: 107683.5294117647, min 106186, max
108626, med 107884
10 rpc_incoming_queue_time avg: 32797.88, min 31176, max 33893, med
32851
30 rpc_incoming_queue_time avg: 28709.35714285714, min 28256, max
29613, med 28559.5
Change (10 & 30): -69.54234303132272% -73.33913802817781%
Off utime+stime avg: 87865.17647058824, min 81530, max 98711, med 85818
10 utime+stime avg: 76208.08, min 65998, max 83642, med 75503
30 utime+stime avg: 68027.71428571429, min 52197, max 77436, med
69101.0
Change (10 & 30): -13.267026754894529% -22.577160806721064%
Off stat_runtime avg: 89877829083.33333, min 85904071609, max
93775583142, med 90007821176.0
10 stat_runtime avg: 71741065414.63637, min 60447250324, max
78006781555, med 73925084586
30 stat_runtime avg: 61287185255.0, min 56450910546, max 71324482809,
med 58686673832.5
Change (10 & 30): -20.179352186934597% -31.810563428078055%
Off switch avg: 844333.5, min 833024, max 855182, med 845283.0
10 switch avg: 439501.7272727273, min 410184, max 471723, med 442107
30 switch avg: 387624.75, min 366818, max 405787, med 388947.0
Change (10 & 30): -47.94690400502558% -54.09103748696458%
========== write count: 21:==========
Runs with batching off: 14, 10: 32, 30: 15
Off client task runtimes avg: 16.00026828658824, min 13.41559386253357,
max 17.893533945083618, med 15.739280700683594
10 client task runtimes avg: 15.867586015945388, min
10.710279941558838, max 21.145692586898804, med 15.130183696746826
30 client task runtimes avg: 13.433500899208916, min
6.494636297225952, max 15.636562824249268, med 14.214406967163086
Change (10 & 30): -0.829250286722194% -16.0420271798245%
Off cpu_stime avg: 17945.428571428572, min 13430, max 24512, med 15011.5
10 cpu_stime avg: 10738.25, min 8612, max 17861, med 9526.5
30 cpu_stime avg: 10200.266666666666, min 7895, max 15310, med 8659
Change (10 & 30): -40.161640847649224% -43.15952597026966%
Off cpu_utime avg: 72935.64285714286, min 65139, max 79458, med 73527.0
10 cpu_utime avg: 71266.0625, min 51727, max 79497, med 70270.5
30 cpu_utime avg: 63485.933333333334, min 50417, max 69125, med 65556
Change (10 & 30): -2.2891144737190006% -12.956229839940425%
Off rpc_incoming_queue_time avg: 111131.85714285714, min 109993, max
112204, med 111220.5
10 rpc_incoming_queue_time avg: 36567.9375, min 34429, max 38382, med
36493.5
30 rpc_incoming_queue_time avg: 32505.6, min 31728, max 33168, med
32553
Change (10 & 30): -67.09500008355582% -70.75042131419175%
Off utime+stime avg: 90881.07142857143, min 78856, max 98983, med
90825.0
10 utime+stime avg: 82004.3125, min 60339, max 96027, med 80178.5
30 utime+stime avg: 73686.2, min 58312, max 83714, med 74202
Change (10 & 30): -9.76744528760115% -18.920190044288653%
Off stat_runtime avg: 94648239082.2, min 92043681683, max 96604701517,
med 95079846312
10 stat_runtime avg: 85979736782.5, min 79247833687, max 94078032523,
med 84953429029.5
30 stat_runtime avg: 74313071072.0, min 63913748224, max 80499907081,
med 76419314491.5
Change (10 & 30): -9.15865142738851% -21.48499349527183%
Off switch avg: 897395.0, min 875049, max 913416, med 901805
10 switch avg: 474644.8333333333, min 453589, max 501224, med 473482.0
30 switch avg: 434748.75, min 420292, max 452761, med 432971.0
Change (10 & 30): -47.10859394878138% -51.55436012012547%
========== write count: 24:==========
Runs with batching off: 18, 10: 21, 30: 12
Off client task runtimes avg: 17.88868013134709, min
14.400426149368286, max 20.726062059402466, med 17.609864950180054
10 client task runtimes avg: 17.408534667600932, min
9.953590393066406, max 21.59314775466919, med 17.0849187374115
30 client task runtimes avg: 15.434335373004554, min 7.21691107749939,
max 17.90212059020996, med 16.086348295211792
Change (10 & 30): -2.684074287318594% -13.720099752030801%
Off cpu_stime avg: 18045.666666666668, min 14732, max 26013, med 15529.0
10 cpu_stime avg: 11599.238095238095, min 8406, max 18696, med 10275
30 cpu_stime avg: 11924.666666666666, min 8512, max 16717, med 9473.0
Change (10 & 30): -35.72286183993519% -33.91950052644218%
Off cpu_utime avg: 78219.0, min 72128, max 81487, med 78120.0
10 cpu_utime avg: 75045.85714285714, min 62685, max 86617, med 74246
30 cpu_utime avg: 69422.08333333333, min 61531, max 73578, med 70479.5
Change (10 & 30): -4.056741785426632% -11.246521518642105%
Off rpc_incoming_queue_time avg: 114673.38888888889, min 112955, max
116400, med 114882.5
10 rpc_incoming_queue_time avg: 40207.28571428572, min 33425, max
41661, med 40531
30 rpc_incoming_queue_time avg: 36444.083333333336, min 35255, max
37595, med 36542.0
Change (10 & 30): -64.93756214596223% -68.21923230275743%
Off utime+stime avg: 96264.66666666667, min 86860, max 106203, med
95218.0
10 utime+stime avg: 86645.09523809524, min 71091, max 96892, med 85338
30 utime+stime avg: 81346.75, min 70043, max 88279, med 82009.5
Change (10 & 30): -9.992837207737804% -15.496772786138225%
Off stat_runtime avg: 97726065525.8, min 92716532372, max 100737581795,
med 97800868584
10 stat_runtime avg: 86468693080.0, min 82172604670, max 88534687430,
med 87583740110.0
30 stat_runtime avg: 81758720786.6, min 77844194290, max 84349902830,
med 82248094233
Change (10 & 30): -11.519314100318523% -16.33888016803927%
Off switch avg: 911795.0, min 899299, max 929683, med 906367
10 switch avg: 511426.5, min 475979, max 529168, med 520279.5
30 switch avg: 472257.4, min 454054, max 495222, med 464972
Change (10 & 30): -43.90992492830077% -48.20574800256636%
Change-Id: Ie92ba4de5eae00d56cd513cb644dce8fb6e14538
Reviewed-on: http://gerrit.cloudera.org:8080/22867
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/consensus/CMakeLists.txt | 1 +
src/kudu/consensus/consensus.proto | 38 +++
src/kudu/consensus/consensus_peers-test.cc | 3 +
src/kudu/consensus/consensus_peers.cc | 168 +++++++++--
src/kudu/consensus/consensus_peers.h | 20 +-
src/kudu/consensus/consensus_queue.cc | 15 +-
src/kudu/consensus/consensus_queue.h | 3 +-
src/kudu/consensus/multi_raft_batcher.cc | 306 +++++++++++++++++++++
src/kudu/consensus/multi_raft_batcher.h | 184 +++++++++++++
src/kudu/consensus/multi_raft_consensus_data.h | 78 ++++++
src/kudu/consensus/peer_manager.cc | 10 +-
src/kudu/consensus/peer_manager.h | 4 +-
src/kudu/consensus/raft_consensus.cc | 2 +
src/kudu/consensus/raft_consensus.h | 2 +
src/kudu/consensus/raft_consensus_quorum-test.cc | 1 +
src/kudu/master/sys_catalog.cc | 1 +
src/kudu/tablet/tablet_replica-test-base.cc | 11 +-
src/kudu/tablet/tablet_replica.cc | 2 +
src/kudu/tablet/tablet_replica.h | 2 +
.../tserver/tablet_copy_source_session-test.cc | 1 +
src/kudu/tserver/tablet_service.cc | 106 ++++++-
src/kudu/tserver/tablet_service.h | 7 +
src/kudu/tserver/ts_tablet_manager.cc | 13 +-
src/kudu/tserver/ts_tablet_manager.h | 3 +
24 files changed, 937 insertions(+), 44 deletions(-)
diff --git a/src/kudu/consensus/CMakeLists.txt
b/src/kudu/consensus/CMakeLists.txt
index 5d20b66cc..514bb2279 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -103,6 +103,7 @@ set(CONSENSUS_SRCS
consensus_queue.cc
leader_election.cc
log_cache.cc
+ multi_raft_batcher.cc
peer_manager.cc
pending_rounds.cc
quorum_util.cc
diff --git a/src/kudu/consensus/consensus.proto
b/src/kudu/consensus/consensus.proto
index dea1eed93..fa10fa3de 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -416,6 +416,41 @@ message ConsensusResponsePB {
optional tserver.TabletServerErrorPB error = 999;
}
+// Same as ConsensusRequestPB. But dest_uuid and caller_uuid are shared between
+// messages.
+message BatchedNoOpConsensusRequestPB {
+ required string tablet_id = 1;
+ required int64 caller_term = 3;
+ optional OpId preceding_id = 4;
+ optional int64 committed_index = 8;
+ optional int64 all_replicated_index = 9;
+ optional fixed64 safe_timestamp = 10;
+ optional int64 last_idx_appended_to_leader = 11;
+}
+
+// Batched NoOp consensus updates.
+message MultiRaftConsensusRequestPB {
+ repeated BatchedNoOpConsensusRequestPB consensus_requests = 1;
+ optional bytes dest_uuid = 2;
+ required bytes caller_uuid = 3;
+}
+
+// Same as ConsensusResponsePB. But responder_uuid and certain type of errors
+// are shared between messages.
+message BatchedNoOpConsensusResponsePB {
+ optional int64 responder_term = 1;
+ optional ConsensusStatusPB status = 2;
+ optional bool server_quiescing = 3;
+ optional tserver.TabletServerErrorPB error = 999;
+}
+
+message MultiRaftConsensusResponsePB {
+ repeated BatchedNoOpConsensusResponsePB consensus_responses = 1;
+ optional bytes responder_uuid = 2;
+ // For errors shared between tablets (e.g. wrong caller id)
+ optional tserver.TabletServerErrorPB error = 999;
+}
+
// A message reflecting the status of an in-flight op.
message OpStatusPB {
required OpId op_id = 1;
@@ -587,6 +622,9 @@ service ConsensusService {
// Analogous to AppendEntries in Raft, but only used for followers.
rpc UpdateConsensus(ConsensusRequestPB) returns (ConsensusResponsePB);
+ // Batched call of ConsensusRequestPB. Only exists for performance reasons.
+ rpc MultiRaftUpdateConsensus(MultiRaftConsensusRequestPB) returns
(MultiRaftConsensusResponsePB);
+
// RequestVote() from Raft.
rpc RequestConsensusVote(VoteRequestPB) returns (VoteResponsePB);
diff --git a/src/kudu/consensus/consensus_peers-test.cc
b/src/kudu/consensus/consensus_peers-test.cc
index 236f730f9..ebcaf98ab 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -148,6 +148,7 @@ class ConsensusPeersTest : public KuduTest {
kTabletId,
kLeaderUuid,
message_queue_.get(),
+ nullptr, // We do not use heartbeat batching.
raft_pool_token_.get(),
&factory,
peer);
@@ -288,6 +289,7 @@ TEST_F(ConsensusPeersTest,
TestCloseWhenRemotePeerDoesntMakeProgress) {
kTabletId,
kLeaderUuid,
message_queue_.get(),
+ nullptr, // We do not use heartbeat batching.
raft_pool_token_.get(),
&factory,
&peer);
@@ -326,6 +328,7 @@ TEST_F(ConsensusPeersTest,
TestDontSendOneRpcPerWriteWhenPeerIsDown) {
kTabletId,
kLeaderUuid,
message_queue_.get(),
+ nullptr, // We do not use heartbeat batching.
raft_pool_token_.get(),
&factory,
&peer);
diff --git a/src/kudu/consensus/consensus_peers.cc
b/src/kudu/consensus/consensus_peers.cc
index bc25f450e..6270ec7dc 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -37,6 +37,8 @@
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/consensus_queue.h"
#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_batcher.h" // IWYU pragma: keep
+#include "kudu/consensus/multi_raft_consensus_data.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
@@ -108,6 +110,7 @@ void Peer::NewRemotePeer(RaftPeerPB peer_pb,
string tablet_id,
string leader_uuid,
PeerMessageQueue* queue,
+ std::shared_ptr<MultiRaftHeartbeatBatcher>
multi_raft_batcher,
ThreadPoolToken* raft_pool_token,
PeerProxyFactory* peer_proxy_factory,
shared_ptr<Peer>* peer) {
@@ -116,6 +119,7 @@ void Peer::NewRemotePeer(RaftPeerPB peer_pb,
std::move(tablet_id),
std::move(leader_uuid),
queue,
+ multi_raft_batcher,
raft_pool_token,
peer_proxy_factory));
new_peer->Init();
@@ -126,6 +130,7 @@ Peer::Peer(RaftPeerPB peer_pb,
string tablet_id,
string leader_uuid,
PeerMessageQueue* queue,
+ std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher,
ThreadPoolToken* raft_pool_token,
PeerProxyFactory* peer_proxy_factory)
: tablet_id_(std::move(tablet_id)),
@@ -138,6 +143,8 @@ Peer::Peer(RaftPeerPB peer_pb,
queue_(queue),
failed_attempts_(0),
messenger_(peer_proxy_factory_->messenger()),
+ multi_raft_batcher_(multi_raft_batcher),
+ multi_raft_batcher_registration_(std::nullopt),
raft_pool_token_(raft_pool_token),
request_pending_(false),
closed_(false),
@@ -151,18 +158,34 @@ void Peer::Init() {
queue_->TrackPeer(peer_pb_);
}
- // Capture a weak_ptr reference into the functor so it can safely handle
- // outliving the peer.
weak_ptr<Peer> w_this = shared_from_this();
- heartbeater_ = PeriodicTimer::Create(
- messenger_,
- [w_this = std::move(w_this)]() {
- if (auto p = w_this.lock()) {
- WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed");
- }
- },
- MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
- heartbeater_->Start();
+ if (multi_raft_batcher_) {
+ // We must use the weak ptr. If we use shared_ptr, ~Peer will be called
+ // on the raft thread pool and it might cause a deadlock.
+ multi_raft_batcher_registration_ =
+ multi_raft_batcher_->Subscribe({[w_this](MultiRaftConsensusData* data)
{
+ if (auto this_ptr = w_this.lock()) {
+ // If the request is already pending, we do not send it again.
+ if (this_ptr->request_pending_) {
+ return;
+ }
+ this_ptr->SendNextRequest(true, data);
+ }
+ }});
+
+ } else {
+ // Capture a weak_ptr reference into the functor so it can safely handle
+ // outliving the peer.
+ heartbeater_ = PeriodicTimer::Create(
+ messenger_,
+ [w_this = std::move(w_this)]() {
+ if (auto p = w_this.lock()) {
+ WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed");
+ }
+ },
+ MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
+ heartbeater_->Start();
+ }
}
Status Peer::SignalRequest(bool even_if_queue_empty) {
@@ -184,20 +207,20 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
// Capture a weak_ptr reference into the submitted functor so that we can
// safely handle the functor outliving its peer.
weak_ptr<Peer> w_this(shared_from_this());
- return raft_pool_token_->Submit([even_if_queue_empty, w_this =
std::move(w_this)]() {
- if (auto p = w_this.lock()) {
- p->SendNextRequest(even_if_queue_empty);
- }
+ return raft_pool_token_->Submit(
+ [even_if_queue_empty, w_this = std::move(w_this)]() {
+ if (auto p = w_this.lock()) {
+ p->SendNextRequest(even_if_queue_empty, nullptr);
+ }
});
}
-void Peer::SendNextRequest(bool even_if_queue_empty) {
+void Peer::SendNextRequest(bool even_if_queue_empty, MultiRaftConsensusData*
mrc_data) {
std::unique_lock l(peer_lock_);
if (PREDICT_FALSE(closed_)) {
return;
}
- // Only allow one request at a time.
if (request_pending_) {
return;
}
@@ -227,16 +250,44 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
bool needs_tablet_copy = false;
int64_t commit_index_before = request_.has_committed_index() ?
request_.committed_index() : kMinimumOpIdIndex;
- Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), &request_,
- &replicate_msg_refs_, &needs_tablet_copy);
- int64_t commit_index_after = request_.has_committed_index() ?
- request_.committed_index() : kMinimumOpIdIndex;
+ Status s;
+ if (mrc_data) {
+ bool ops_pending = false;
+ s = queue_->RequestForPeer(peer_pb_.permanent_uuid(),
+ &request_,
+ &replicate_msg_refs_,
+ &needs_tablet_copy,
+ &ops_pending);
+ if (s.ok() && (ops_pending || (request_.has_committed_index() &&
+ request_.committed_index() >
commit_index_before))) {
+ weak_ptr<Peer> w_this = shared_from_this();
+ // We don't want to send out a heavy request on the batching thread.
+ // So if we have any ops, we submit a task to the raft thread pool.
+ Status s2 = raft_pool_token_->Submit([w_this, even_if_queue_empty]() {
+ if (auto p = w_this.lock()) {
+ WARN_NOT_OK(p->SignalRequest(even_if_queue_empty), "SignalRequest
failed");
+ }
+ });
+ if (s2.ok()) {
+ return;
+ }
+ DCHECK(false);
+ LOG_WITH_PREFIX_UNLOCKED(WARNING)
+ << "Failed to submit SignalRequest callback, processing heartbeat
with"
+ << " ops on mrc thread";
+ }
+ } else {
+ s = queue_->RequestForPeer(
+ peer_pb_.permanent_uuid(), &request_, &replicate_msg_refs_,
&needs_tablet_copy);
+ }
if (PREDICT_FALSE(!s.ok())) {
VLOG_WITH_PREFIX_UNLOCKED(1) << s.ToString();
return;
}
+ int64_t commit_index_after = request_.has_committed_index() ?
+ request_.committed_index() : kMinimumOpIdIndex;
// NOTE: we only perform this check after creating the RequestForPeer() call
// to ensure any peer health updates that happen therein associated with this
// peer actually happen. E.g. if we haven't been able to create a proxy in a
@@ -274,7 +325,12 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
if (req_has_ops) {
// If we're actually sending ops there's no need to heartbeat for a while.
- heartbeater_->Snooze();
+ if (multi_raft_batcher_) {
+ // TODO(martonka) add snooze logic to MultiRaftHeartbeatBatcher (I don't
+ // expect any measurable performance impact).
+ } else {
+ heartbeater_->Snooze();
+ }
}
if (!has_sent_first_request_) {
@@ -291,16 +347,36 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
<< SecureShortDebugString(request_);
controller_.Reset();
+
request_pending_ = true;
l.unlock();
// Capture a shared_ptr reference into the RPC callback so that we're
guaranteed
// that this object outlives the RPC.
shared_ptr<Peer> s_this = shared_from_this();
- proxy_->UpdateAsync(request_, &response_, &controller_,
- [s_this]() {
- s_this->ProcessResponse();
- });
+
+ if (mrc_data && !req_has_ops) {
+ if (mrc_data->batch_req.consensus_requests_size() == 0) {
+ // If this is the first request in the batch, we need to set the
+ // responder UUID and term.
+ mrc_data->batch_req.set_caller_uuid(request_.caller_uuid());
+ mrc_data->batch_req.set_dest_uuid(request_.dest_uuid());
+ } else {
+ DCHECK(mrc_data->batch_req.caller_uuid() == request_.caller_uuid());
+ DCHECK(mrc_data->batch_req.dest_uuid() == request_.dest_uuid());
+ }
+ *mrc_data->batch_req.add_consensus_requests() = ToNoOpRequest(request_);
+ mrc_data->response_callback_data.emplace_back(
+ [s_this](const rpc::RpcController& controller,
+ const MultiRaftConsensusResponsePB& root,
+ const BatchedNoOpConsensusResponsePB* resp) {
+ s_this->ProcessResponseFromBatch(controller, root, resp);
+ });
+ } else {
+ DCHECK(!mrc_data) << "Messages with ops should not be sent on MRC thread";
+ proxy_->UpdateAsync(
+ request_, &response_, &controller_, [s_this]() {
s_this->ProcessSingleResponse(); });
+ }
}
void Peer::StartElection() {
@@ -335,7 +411,39 @@ void Peer::StartElection() {
});
}
-void Peer::ProcessResponse() {
+void Peer::ProcessResponseFromBatch(const rpc::RpcController& controller,
+ const MultiRaftConsensusResponsePB& root,
+ const BatchedNoOpConsensusResponsePB*
resp) {
+ response_.Clear();
+ if (root.has_error()) {
+ *response_.mutable_error() = root.error();
+ }
+ if (root.has_responder_uuid()) {
+ response_.set_responder_uuid(root.responder_uuid());
+ }
+ if (resp != nullptr) {
+ if (resp->has_responder_term()) {
+ response_.set_responder_term(resp->responder_term());
+ }
+ if (resp->has_status()) {
+ *response_.mutable_status() = resp->status();
+ }
+ if (resp->has_server_quiescing()) {
+ response_.set_server_quiescing(resp->server_quiescing());
+ }
+ if (resp->has_error()) {
+ *response_.mutable_error() = resp->error();
+ }
+ }
+
+ ProcessResponse(controller);
+}
+
+void Peer::ProcessSingleResponse() {
+ ProcessResponse(controller_);
+}
+
+void Peer::ProcessResponse(const rpc::RpcController& controller) {
// Note: This method runs on the reactor thread.
std::lock_guard lock(peer_lock_);
if (PREDICT_FALSE(closed_)) {
@@ -346,7 +454,7 @@ void Peer::ProcessResponse() {
MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction);
// Process RpcController errors.
- const auto controller_status = controller_.status();
+ const auto controller_status = controller.status();
if (!controller_status.ok()) {
auto ps = controller_status.IsRemoteError() ?
PeerStatus::REMOTE_ERROR : PeerStatus::RPC_LAYER_ERROR;
@@ -526,6 +634,10 @@ void Peer::Close() {
}
{
std::lock_guard lock(peer_lock_);
+ if (multi_raft_batcher_registration_) {
+ DCHECK(multi_raft_batcher_);
+ multi_raft_batcher_->Unsubscribe(*multi_raft_batcher_registration_);
+ }
closed_ = true;
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Closing peer: " <<
peer_pb_.permanent_uuid();
diff --git a/src/kudu/consensus/consensus_peers.h
b/src/kudu/consensus/consensus_peers.h
index 60905dc45..badf259fb 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <memory>
+#include <optional>
#include <ostream>
#include <string>
#include <vector>
@@ -46,6 +47,8 @@ class PeriodicTimer;
}
namespace consensus {
+class MultiRaftHeartbeatBatcher;
+struct MultiRaftConsensusData;
class PeerMessageQueue;
class PeerProxy;
class PeerProxyFactory;
@@ -118,6 +121,7 @@ class Peer :
std::string tablet_id,
std::string leader_uuid,
PeerMessageQueue* queue,
+ std::shared_ptr<MultiRaftHeartbeatBatcher>
multi_raft_batcher,
ThreadPoolToken* raft_pool_token,
PeerProxyFactory* peer_proxy_factory,
std::shared_ptr<Peer>* peer);
@@ -127,18 +131,27 @@ class Peer :
std::string tablet_id,
std::string leader_uuid,
PeerMessageQueue* queue,
+ std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher,
ThreadPoolToken* raft_pool_token,
PeerProxyFactory* peer_proxy_factory);
private:
- void SendNextRequest(bool even_if_queue_empty);
+ // 'mrc_data' allows us to send periodic heartbeats in batches, removing
+ // some load from the system.
+ void SendNextRequest(bool even_if_queue_empty, MultiRaftConsensusData*
mrc_data = nullptr);
+
+ void ProcessResponseFromBatch(const rpc::RpcController& controller,
+ const MultiRaftConsensusResponsePB& root,
+ const BatchedNoOpConsensusResponsePB* resp);
+
+ void ProcessSingleResponse();
// Signals that a response was received from the peer.
//
// This method is called from the reactor thread and calls
// DoProcessResponse() on raft_pool_token_ to do any work that requires IO or
// lock-taking.
- void ProcessResponse();
+ void ProcessResponse(const rpc::RpcController& controller);
// Run on 'raft_pool_token'. Does response handling that requires IO or may
block.
void DoProcessResponse();
@@ -201,6 +214,9 @@ class Peer :
std::shared_ptr<rpc::Messenger> messenger_;
+ std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher_;
+ std::optional<uint64_t> multi_raft_batcher_registration_;
+
// Thread pool token used to construct requests to this peer.
//
// RaftConsensus owns this shared token and is responsible for destroying it.
diff --git a/src/kudu/consensus/consensus_queue.cc
b/src/kudu/consensus/consensus_queue.cc
index def546acf..0e5f40c1b 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -646,7 +646,8 @@ HealthReportPB::HealthStatus
PeerMessageQueue::PeerHealthStatus(const TrackedPee
Status PeerMessageQueue::RequestForPeer(const string& uuid,
ConsensusRequestPB* request,
vector<ReplicateRefPtr>* msg_refs,
- bool* needs_tablet_copy) {
+ bool* needs_tablet_copy,
+ bool* return_no_ops_only) {
// Maintain a thread-safe copy of necessary members.
OpId preceding_id;
int64_t current_term;
@@ -664,6 +665,13 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
}
peer_copy = *peer;
+ if (return_no_ops_only) {
+ *return_no_ops_only = peer_copy.last_exchange_status ==
PeerStatus::TABLET_NOT_FOUND
+ || log_cache_.HasOpBeenWritten(peer->next_index);
+ if (*return_no_ops_only) {
+ return Status::OK(); // Nothing is touched, we can try again from a
different thread later.
+ }
+ }
// Clear the requests without deleting the entries, as they may be in use
by other peers.
request->mutable_ops()->UnsafeArenaExtractSubrange(0, request->ops_size(),
nullptr);
@@ -720,6 +728,11 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
vector<ReplicateRefPtr> messages;
int64_t max_batch_size = FLAGS_consensus_max_batch_size_bytes -
request->ByteSizeLong();
+ // messages were added since early return check. There is a very small
+ // chance that we end up here with return_no_ops_only passed.
+ if (return_no_ops_only != nullptr) {
+ max_batch_size = 0;
+ }
// We try to get the follower's next_index from our log.
Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
max_batch_size,
diff --git a/src/kudu/consensus/consensus_queue.h
b/src/kudu/consensus/consensus_queue.h
index 16d1f7ed3..432c737d3 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -276,7 +276,8 @@ class PeerMessageQueue {
Status RequestForPeer(const std::string& uuid,
ConsensusRequestPB* request,
std::vector<ReplicateRefPtr>* msg_refs,
- bool* needs_tablet_copy);
+ bool* needs_tablet_copy,
+ bool* return_no_ops_only = nullptr);
// Fill in a StartTabletCopyRequest for the specified peer.
// If that peer should not initiate Tablet Copy, returns a non-OK status.
diff --git a/src/kudu/consensus/multi_raft_batcher.cc
b/src/kudu/consensus/multi_raft_batcher.cc
new file mode 100644
index 000000000..94e76539e
--- /dev/null
+++ b/src/kudu/consensus/multi_raft_batcher.cc
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/consensus/multi_raft_batcher.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex> // IWYU pragma: keep
+#include <ostream>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_consensus_data.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/rpc/periodic.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+class DnsResolver;
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+} // namespace kudu
+
+DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(raft_heartbeat_interval_ms);
+
+DEFINE_int32(multi_raft_heartbeat_window_ms, 100,
+ "The batch time window for heartbeat batching."
+ "For minimal delay and still maximum effectiveness set"
+ "multi_raft_heartbeat_window_ms = heartbeat_interval_ms * ("
+ "batch_size / {estimated follower peers from the same host}) + {a little
tolerance};"
+ "however, a 0.1 * heartbeat_interval_ms is good, and there is no reason to"
+ "set it any lower."
+ "This value is also forced to be less than or equal to half of the heartbeat
interval,"
+ "because it makes no sense to introduce a possible delay comparable to the
heartbeat interval."
+);
+TAG_FLAG(multi_raft_heartbeat_window_ms, experimental);
+
+DEFINE_bool(enable_multi_raft_heartbeat_batcher,
+ false,
+ "Whether to enable the batching of raft heartbeats.");
+TAG_FLAG(enable_multi_raft_heartbeat_batcher, experimental);
+
+DEFINE_int32(multi_raft_batch_size, 30, "Maximum batch size for a multi-raft
consensus payload.");
+TAG_FLAG(multi_raft_batch_size, experimental);
+DEFINE_validator(multi_raft_batch_size,
+ [](const char* /*flagname*/, int32_t value) { return value >
0; });
+
+namespace kudu {
+namespace consensus {
+
+using kudu::DnsResolver;
+using rpc::PeriodicTimer;
+
+uint64_t MultiRaftHeartbeatBatcher::Subscribe(const PeriodicHeartbeater&
heartbeater) {
+ DCHECK(!closed_);
+ std::lock_guard l(heartbeater_lock_);
+ auto it = peers_.insert({next_id_, heartbeater});
+ DCHECK(it.second) << "Peer with id " << next_id_ << " already exists";
+ queue_.push_back(
+ {MonoTime::Now() +
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms), next_id_});
+ return next_id_++;
+}
+
+void MultiRaftHeartbeatBatcher::Unsubscribe(uint64_t id) {
+ std::lock_guard l(heartbeater_lock_);
+ DCHECK_EQ(1, peers_.count(id));
+ peers_.erase(id);
+}
+
+MultiRaftHeartbeatBatcher::MultiRaftHeartbeatBatcher(
+ const kudu::HostPort& hostport,
+ DnsResolver* dns_resolver,
+ std::shared_ptr<kudu::rpc::Messenger> messenger,
+ MonoDelta flush_interval,
+ std::shared_ptr<ThreadPoolToken> raft_pool_token)
+ : messenger_(std::move(messenger)),
+ consensus_proxy_(std::make_unique<ConsensusServiceProxy>(messenger_,
hostport, dns_resolver)),
+ batch_time_window_(flush_interval),
+ raft_pool_token_(std::move(raft_pool_token)),
+ closed_(false) {}
+
+void MultiRaftHeartbeatBatcher::StartTimer() {
+ std::weak_ptr<MultiRaftHeartbeatBatcher> const weak_peer =
shared_from_this();
+ DCHECK_EQ(nullptr, heartbeat_timer_) << "Heartbeat timer started twice";
+ heartbeat_timer_ = PeriodicTimer::Create(
+ messenger_,
+ [weak_peer]() {
+ if (auto peer = weak_peer.lock()) {
+ peer->PrepareNextBatch();
+ }
+ },
+ batch_time_window_);
+ heartbeat_timer_->Start();
+}
+
+void MultiRaftHeartbeatBatcher::PrepareNextBatch() {
+ std::vector<PeriodicHeartbeater> current_calls;
+ current_calls.reserve(FLAGS_multi_raft_batch_size);
+
+ auto submit_callbacks = [&]() {
+ auto this_ptr = shared_from_this();
+ KUDU_WARN_NOT_OK(
+ raft_pool_token_->Submit([this_ptr, current_calls =
std::move(current_calls)]() {
+ this_ptr->SendOutScheduled(current_calls);
+ }),
+ "Failed to submit multi-raft heartbeat batcher task");
+ current_calls.clear();
+ current_calls.reserve(FLAGS_multi_raft_batch_size);
+ };
+
+ const auto sending_period =
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms);
+
+ std::lock_guard lock(heartbeater_lock_);
+
+ auto send_until = MonoTime::Now() + batch_time_window_;
+ if (closed_) {
+ return; // Batcher is closed, raft_pool_token_ might be invalid.
+ }
+
+ while (!queue_.empty() && queue_.front().time <= send_until) {
+ auto front = queue_.front();
+ queue_.pop_front();
+ auto peer = FindOrNull(peers_, front.id);
+ if (peer == nullptr) {
+ continue; // Peer was unsubscribed.
+ }
+ // TODO(martonka): MonoTime::Now() + sending_period would make much more
sense,
+ // but this is consistent with the old timer behavior.
+ auto next_time = front.time + sending_period;
+ queue_.push_back({next_time, front.id});
+ current_calls.emplace_back(*peer);
+ if (current_calls.size() >= FLAGS_multi_raft_batch_size) {
+ submit_callbacks();
+ }
+ }
+ if (!current_calls.empty()) {
+ submit_callbacks();
+ }
+}
+
+void MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback(
+ std::shared_ptr<MultiRaftConsensusData> data) {
+ for (int i = 0; i < data->batch_req.consensus_requests_size(); i++) {
+ const auto* resp = data->batch_res.consensus_responses_size() > i
+ ? &data->batch_res.consensus_responses(i)
+ : nullptr;
+ data->response_callback_data[i](data->controller, data->batch_res, resp);
+ }
+}
+
+void MultiRaftHeartbeatBatcher::Shutdown() {
+ {
+ std::lock_guard lock(heartbeater_lock_);
+ if (closed_) {
+ return; // Already closed.
+ }
+ closed_ = true;
+ }
+ if (heartbeat_timer_) {
+ heartbeat_timer_->Stop();
+ heartbeat_timer_.reset();
+ }
+}
+
+void MultiRaftHeartbeatBatcher::SendOutScheduled(
+ const std::vector<PeriodicHeartbeater>& scheduled_callbacks) {
+ // No need to hold the lock here, as we are not modifying the state of the
batcher.
+ DCHECK_LT(0, scheduled_callbacks.size());
+ DCHECK_GE(FLAGS_multi_raft_batch_size, scheduled_callbacks.size());
+ auto data =
std::make_shared<MultiRaftConsensusData>(scheduled_callbacks.size());
+
+ for (const auto& cb : scheduled_callbacks) {
+ cb(data.get());
+ }
+ // If all called heartbeaters already have another call in progress, or
pending elements
+ // in its queue, we might end up with an empty batch.
+ if (data->batch_req.consensus_requests_size() == 0) {
+ return;
+ }
+
+ DCHECK(data->batch_req.IsInitialized());
+ VLOG(1) << "Sending BatchRequest with size: " <<
data->batch_req.consensus_requests_size();
+
+
data->controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
+
+ // Copy data shared pointer to ensure that it remains valid
+ consensus_proxy_->MultiRaftUpdateConsensusAsync(
+ data->batch_req, &data->batch_res, &data->controller, [data, inst =
shared_from_this()]() {
+ inst->MultiRaftUpdateHeartbeatResponseCallback(data);
+ });
+}
+
+namespace {
+MonoDelta GetTimeWindow() {
+ // We don't want to delay more than half of the heartbeat interval.
+ // Even a quarter of the interval starts to be questionable delay,
+ // but half is for sure too much.
+ auto flush_interval =
+ std::min(FLAGS_multi_raft_heartbeat_window_ms,
FLAGS_consensus_rpc_timeout_ms / 2);
+ if (flush_interval != FLAGS_multi_raft_heartbeat_window_ms) {
+ LOG(ERROR) << "multi_raft_heartbeat_window_ms should not be more than "
+ << " consensus_rpc_timeout_ms / 2. , forcing
multi_raft_heartbeat_window_ms = "
+ << flush_interval;
+ }
+ return MonoDelta::FromMilliseconds(flush_interval);
+}
+} // namespace
+
+MultiRaftManager::MultiRaftManager(kudu::DnsResolver* dns_resolver,
+ const scoped_refptr<MetricEntity>& entity)
+ : dns_resolver_(dns_resolver),
+ batch_time_window_(GetTimeWindow()),
+ closed_(false)
+{}
+
+void MultiRaftManager::Init(const std::shared_ptr<rpc::Messenger>& messenger,
+ ThreadPool* raft_pool) {
+ DCHECK(!closed_);
+ messenger_ = messenger;
+ raft_pool_ = raft_pool;
+}
+
+MultiRaftHeartbeatBatcherPtr MultiRaftManager::AddOrGetBatcher(
+ const kudu::consensus::RaftPeerPB& remote_peer_pb) {
+ if (!FLAGS_enable_multi_raft_heartbeat_batcher) {
+ return nullptr; // Batching is disabled.
+ }
+
+ DCHECK(messenger_);
+ DCHECK(raft_pool_);
+ auto hostport = HostPortFromPB(remote_peer_pb.last_known_addr());
+ std::lock_guard lock(mutex_);
+ if (closed_) {
+ DCHECK(false) << "We are in shutdown phase but still adding a peer. Should
never happen";
+ return nullptr;
+ }
+ MultiRaftHeartbeatBatcherPtr batcher;
+
+ // After taking the lock, check if there is already a batcher
+ // for the same remote host and return it.
+ // If we used to have replicas shared with this host, but they were all
removed,
+ // the old batcher might be deleted already (so the weak ptr is expired).
+ auto res = FindOrNull(batchers_, hostport);
+ if (res && (batcher = res->batcher.lock())) {
+ return batcher;
+ }
+ // We still have the token for this host, even if the batcher was deleted.
+ auto raft_pool_token = res ? res->raft_pool_token
+ : std::shared_ptr<ThreadPoolToken>(move(
+
raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT)));
+ batcher = std::make_shared<MultiRaftHeartbeatBatcher>(
+ hostport, dns_resolver_, messenger_, batch_time_window_,
raft_pool_token);
+ batcher->StartTimer();
+ batchers_.insert({hostport, BatcherAndPoolToken(batcher, raft_pool_token)});
+ return batcher;
+}
+
+void MultiRaftManager::Shutdown() {
+ std::unordered_map<HostPort, BatcherAndPoolToken, HostPortHasher> batchers;
+ {
+ std::lock_guard lock(mutex_);
+ batchers.swap(batchers_);
+ }
+ for (auto& entry : batchers) {
+ if (auto batcher = entry.second.batcher.lock()) {
+ batcher->Shutdown();
+ }
+ entry.second.raft_pool_token->Shutdown();
+ }
+ batchers_.clear();
+}
+
+} // namespace consensus
+} // namespace kudu
diff --git a/src/kudu/consensus/multi_raft_batcher.h
b/src/kudu/consensus/multi_raft_batcher.h
new file mode 100644
index 000000000..b09406bee
--- /dev/null
+++ b/src/kudu/consensus/multi_raft_batcher.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <deque>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+
+namespace kudu {
+class DnsResolver;
+class MetricEntity;
+class ThreadPool;
+class ThreadPoolToken;
+
+namespace rpc {
+class Messenger;
+class PeriodicTimer;
+class RpcController;
+} // namespace rpc
+
+namespace consensus {
+class BatchedNoOpConsensusResponsePB;
+class MultiRaftConsensusResponsePB;
+class RaftPeerPB;
+
+typedef std::unique_ptr<ConsensusServiceProxy> ConsensusServiceProxyPtr;
+
+using HeartbeatResponseCallback = std::function<void(const rpc::RpcController&,
+ const
MultiRaftConsensusResponsePB&,
+ const
BatchedNoOpConsensusResponsePB*)>;
+
+// - MultiRaftHeartbeatBatcher is responsible for batching the processing
+// and sending of no-op heartbeats, saving cpu and network resources.
+// - Peers can request (using Subscribe) to be called back on a
+// {raft_heartbeat_interval_ms +/- multi_raft_heartbeat_window_ms} interval,
+// together with other peers that are connected to the same host.
+// Average interval over a long time will be raft_heartbeat_interval_ms.
+// - This will allow for less context switching and a bit smaller network
+// overhead, as some fields can be shared, and rpc calls can be batched.
+// - The subscribed peers will be called back with the a MultiRaftConsensusData
+// object, and it is the responsibility of the called peer to:
+// + Correctly append to the MultiRaftConsensusData object.
+// + Fill the shared fields if needed.
+// + If it would do anything with significant cpu usage (pending items in
the queue),
+// it should submit an other task to the raft pool, and return early,
not to
+// block the other peers in the same batch.
+struct MultiRaftConsensusData;
+
+class MultiRaftHeartbeatBatcher : public
std::enable_shared_from_this<MultiRaftHeartbeatBatcher> {
+ public:
+ MultiRaftHeartbeatBatcher(const kudu::HostPort& hostport,
+ ::kudu::DnsResolver* dns_resolver,
+ std::shared_ptr<rpc::Messenger> messenger,
+ MonoDelta flush_interval,
+ std::shared_ptr<ThreadPoolToken> raft_pool_token);
+
+ ~MultiRaftHeartbeatBatcher() = default;
+
+ using PeriodicHeartbeater = std::function<void(MultiRaftConsensusData*)>;
+ // Subscribe for periodic callbacks batched with other peers.
+ // Returns a unique id for the peer, which can be used to unsubscribe.
+ uint64_t Subscribe(const PeriodicHeartbeater& heartbeater);
+ void Unsubscribe(uint64_t id);
+
+ private:
+ friend class MultiRaftManager;
+
+ // Collect the heartbeats due in the next flush interval.
+ // Submit a task to the Raft pool to send them in a single RPC call per
batch.
+ void PrepareNextBatch();
+
+ void Shutdown();
+
+ void StartTimer();
+
+ void
MultiRaftUpdateHeartbeatResponseCallback(std::shared_ptr<MultiRaftConsensusData>
data);
+
+ void SendOutScheduled(const std::vector<PeriodicHeartbeater>&
scheduled_callbacks);
+
+ std::shared_ptr<rpc::PeriodicTimer> heartbeat_timer_;
+ std::shared_ptr<rpc::Messenger> messenger_;
+
+ ConsensusServiceProxyPtr consensus_proxy_;
+
+ uint64_t next_id_ = 1;
+ std::unordered_map<int, PeriodicHeartbeater> peers_;
+ // Next required heartbeat time for a peer. queue_ is ordered by time.
+ // After calling a peer, push it to the end of the queue with its next due
time.
+ // Since all callbacks share the same period (raft_heartbeat_interval_ms),
+ // a simple deque is sufficient and faster than a priority queue.
+ struct Callback {
+ MonoTime time; // Time for the next heartbeat for this peer.
+ uint64_t id; // id of the peer inside peers_.
+ };
+ std::deque<Callback> queue_;
+ // Protects queue_ and peers_.
+ // Peers might subscribe concurrently, and PrepareNextBatch also uses both of
+ // these members.
+ std::mutex heartbeater_lock_;
+
+ const MonoDelta batch_time_window_;
+ std::shared_ptr<ThreadPoolToken> raft_pool_token_;
+ bool closed_;
+};
+
+using MultiRaftHeartbeatBatcherPtr =
std::shared_ptr<MultiRaftHeartbeatBatcher>;
+
+// MultiRaftManager is responsible for managing all MultiRaftHeartbeatBatchers
+// for a given TServer (utilizes a mapping between a HostPort and the
corresponding batcher).
+// MultiRaftManager allows multiple peers to share the same batcher
+// if they are connected to the same remote host.
+class MultiRaftManager : public std::enable_shared_from_this<MultiRaftManager>
{
+ public:
+ MultiRaftManager(kudu::DnsResolver* dns_resolver, const
scoped_refptr<MetricEntity>& entity);
+ ~MultiRaftManager() = default;
+
+ void Init(const std::shared_ptr<rpc::Messenger>& messenger, ThreadPool*
raft_pool);
+
+ void Shutdown();
+
+ MultiRaftHeartbeatBatcherPtr AddOrGetBatcher(const
kudu::consensus::RaftPeerPB& remote_peer_pb);
+
+ private:
+ std::shared_ptr<rpc::Messenger> messenger_;
+
+ kudu::DnsResolver* dns_resolver_;
+
+ ThreadPool* raft_pool_ = nullptr;
+
+ // Protects raft_pool_ and batchers_ during
+ // concurrent calls of AddOrGetBatcher.
+ std::mutex mutex_;
+
+ // Uses a weak_ptr to allow deallocation of unused batchers once no more
+ // consensus peers use them (and to stop the periodic timer).
+ // The MultiRaftHeartbeatBatcher destructor might be called on the raft pool
+ // using the same token. It is not safe to destruct the token there, so we
+ // keep it alive separately. Keeping one unused token per decommissioned host
+ // until restart should not cause any problems.
+ // TODO (martonka): Create a periodic timer to clean up tokens for dead
hosts.
+ // Unless the cluster goes through millions of servers without restarting
this
+ // TServer, this is not a problem. However, running a periodic cleanup timer
+ // (once per hour/day/week is enough) would be nice.
+ struct BatcherAndPoolToken {
+ std::weak_ptr<MultiRaftHeartbeatBatcher> batcher;
+ std::shared_ptr<ThreadPoolToken> raft_pool_token;
+ BatcherAndPoolToken(std::shared_ptr<MultiRaftHeartbeatBatcher> b,
+ std::shared_ptr<ThreadPoolToken> t)
+ : batcher(b), raft_pool_token(std::move(t)) {}
+ };
+ std::unordered_map<HostPort, BatcherAndPoolToken, HostPortHasher> batchers_;
+
+ const MonoDelta batch_time_window_;
+
+ bool closed_;
+};
+
+} // namespace consensus
+} // namespace kudu
diff --git a/src/kudu/consensus/multi_raft_consensus_data.h
b/src/kudu/consensus/multi_raft_consensus_data.h
new file mode 100644
index 000000000..c63a7cf88
--- /dev/null
+++ b/src/kudu/consensus/multi_raft_consensus_data.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <vector>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/rpc/rpc_controller.h"
+
+namespace kudu {
+namespace consensus {
+
+// Callback to process a single heartbeat response from a MultiRaftConsensus
RPC call.
+// MultiRaftConsensusResponsePB contains errors shared between the batched
requests
+// and peer-specific errors are in the BatchedNoOpConsensusResponsePB.
+using HeartbeatResponseCallback = std::function<void(const rpc::RpcController&,
+ const
MultiRaftConsensusResponsePB&,
+ const
BatchedNoOpConsensusResponsePB*)>;
+
+// Data for a single multi-raft consensus batch.
+// batch_req and response_callback_data must have the same number of elements,
+// corresponding to the same heartbeaters in order.
+// The MultiRaftUpdateConsensus RPC call will fill batch_res while preserving
that order.
+struct MultiRaftConsensusData {
+ MultiRaftConsensusRequestPB batch_req;
+ MultiRaftConsensusResponsePB batch_res;
+ // Since we send out multiple RPCs in parallel, each batch needs its own RPC
controller.
+ rpc::RpcController controller;
+ // Callbacks for the individual heartbeaters.
+ std::vector<HeartbeatResponseCallback> response_callback_data;
+ explicit MultiRaftConsensusData(size_t expected_size) {
+ response_callback_data.reserve(expected_size);
+ }
+};
+
+inline BatchedNoOpConsensusRequestPB ToNoOpRequest(const ConsensusRequestPB&
req) {
+ BatchedNoOpConsensusRequestPB res;
+ if (req.has_tablet_id()) {
+ res.set_tablet_id(req.tablet_id());
+ }
+ if (req.has_caller_term()) {
+ res.set_caller_term(req.caller_term());
+ }
+ if (req.has_preceding_id()) {
+ *res.mutable_preceding_id() = req.preceding_id();
+ }
+ if (req.has_committed_index()) {
+ res.set_committed_index(req.committed_index());
+ }
+ if (req.has_all_replicated_index()) {
+ res.set_all_replicated_index(req.all_replicated_index());
+ }
+ if (req.has_safe_timestamp()) {
+ res.set_safe_timestamp(req.safe_timestamp());
+ }
+ if (req.has_last_idx_appended_to_leader()) {
+ res.set_last_idx_appended_to_leader(req.last_idx_appended_to_leader());
+ }
+
+ return res;
+}
+
+} // namespace consensus
+} // namespace kudu
diff --git a/src/kudu/consensus/peer_manager.cc
b/src/kudu/consensus/peer_manager.cc
index be43243a9..a7cd11588 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -27,6 +27,7 @@
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_batcher.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
@@ -46,6 +47,7 @@ PeerManager::PeerManager(string tablet_id,
string local_uuid,
PeerProxyFactory* peer_proxy_factory,
PeerMessageQueue* queue,
+ consensus::MultiRaftManager* multi_raft_manager,
ThreadPoolToken* raft_pool_token,
scoped_refptr<log::Log> log)
: tablet_id_(std::move(tablet_id)),
@@ -53,7 +55,8 @@ PeerManager::PeerManager(string tablet_id,
peer_proxy_factory_(peer_proxy_factory),
queue_(queue),
raft_pool_token_(raft_pool_token),
- log_(std::move(log)) {
+ log_(std::move(log)),
+ multi_raft_manager_(multi_raft_manager) {
}
PeerManager::~PeerManager() {
@@ -75,10 +78,15 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB&
config) {
VLOG(1) << GetLogPrefix() << "Adding remote peer. Peer: " <<
SecureShortDebugString(peer_pb);
shared_ptr<Peer> remote_peer;
+ std::shared_ptr<MultiRaftHeartbeatBatcher> multi_raft_batcher = nullptr;
+ if (multi_raft_manager_) {
+ multi_raft_batcher = multi_raft_manager_->AddOrGetBatcher(peer_pb);
+ }
Peer::NewRemotePeer(peer_pb,
tablet_id_,
local_uuid_,
queue_,
+ multi_raft_batcher,
raft_pool_token_,
peer_proxy_factory_,
&remote_peer);
diff --git a/src/kudu/consensus/peer_manager.h
b/src/kudu/consensus/peer_manager.h
index 7c2a9ee61..503eaf0c6 100644
--- a/src/kudu/consensus/peer_manager.h
+++ b/src/kudu/consensus/peer_manager.h
@@ -39,7 +39,7 @@ class Peer;
class PeerMessageQueue;
class PeerProxyFactory;
class RaftConfigPB;
-
+class MultiRaftManager;
// Manages the remote peers that pull data from the local queue and send
updates to the
// remote machines.
class PeerManager {
@@ -50,6 +50,7 @@ class PeerManager {
std::string local_uuid,
PeerProxyFactory* peer_proxy_factory,
PeerMessageQueue* queue,
+ MultiRaftManager* multi_raft_manager,
ThreadPoolToken* raft_pool_token,
scoped_refptr<log::Log> log);
@@ -80,6 +81,7 @@ class PeerManager {
ThreadPoolToken* raft_pool_token_;
scoped_refptr<log::Log> log_;
PeersMap peers_;
+ MultiRaftManager* multi_raft_manager_;
mutable simple_spinlock lock_;
DISALLOW_COPY_AND_ASSIGN(PeerManager);
diff --git a/src/kudu/consensus/raft_consensus.cc
b/src/kudu/consensus/raft_consensus.cc
index 9fcef781f..b39a653aa 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -237,6 +237,7 @@ Status RaftConsensus::Create(ConsensusOptions options,
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
unique_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
+ MultiRaftManager* multi_raft_manager,
unique_ptr<TimeManager> time_manager,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
@@ -300,6 +301,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo&
info,
peer_uuid(),
peer_proxy_factory_.get(),
queue.get(),
+ multi_raft_manager,
raft_pool_token_.get(),
log_));
unique_ptr<PendingRounds> pending(new PendingRounds(
diff --git a/src/kudu/consensus/raft_consensus.h
b/src/kudu/consensus/raft_consensus.h
index 921d9210d..27579aea5 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -67,6 +67,7 @@ namespace consensus {
class ConsensusMetadataManager;
class ConsensusRound;
class ConsensusRoundHandler;
+class MultiRaftManager;
class PeerManager;
class PeerProxyFactory;
class PendingRounds;
@@ -163,6 +164,7 @@ class RaftConsensus : public
std::enable_shared_from_this<RaftConsensus>,
Status Start(const ConsensusBootstrapInfo& info,
std::unique_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
+ MultiRaftManager* multi_raft_manager,
std::unique_ptr<TimeManager> time_manager,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index b2eb93465..51d16958e 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -238,6 +238,7 @@ class RaftConsensusQuorumTest : public KuduTest {
boot_info,
std::move(proxy_factory),
logs_[i],
+ nullptr,
std::move(time_manager),
op_factories_.back().get(),
metric_entity_,
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index abff6e565..8a19829ec 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -536,6 +536,7 @@ Status SysCatalogTable::SetupTablet(
master_->messenger(),
/*result_tracker*/nullptr,
log,
+ nullptr,
master_->tablet_prepare_pool(),
master_->dns_resolver()), "failed to start system catalog replica");
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc
b/src/kudu/tablet/tablet_replica-test-base.cc
index d522949eb..cd75b1022 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -20,12 +20,14 @@
#include <memory>
#include <ostream>
#include <string>
+#include <type_traits>
#include <utility>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
@@ -43,7 +45,6 @@
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet-test-util.h"
-#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
@@ -53,6 +54,12 @@
#include "kudu/util/pb_util.h"
#include "kudu/util/test_macros.h"
+namespace kudu {
+namespace tablet {
+class Tablet;
+} // namespace tablet
+} // namespace kudu
+
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::ConsensusMetadata;
using kudu::consensus::ConsensusMetadataManager;
@@ -178,6 +185,7 @@ Status TabletReplicaTestBase::StartReplica(const
ConsensusBootstrapInfo& info) {
messenger_,
scoped_refptr<ResultTracker>(),
log,
+ nullptr,
prepare_pool_.get(),
dns_resolver_.get());
}
@@ -223,6 +231,7 @@ Status TabletReplicaTestBase::RestartReplica(bool
reset_tablet) {
messenger_,
scoped_refptr<ResultTracker>(),
log,
+ nullptr,
prepare_pool_.get(),
dns_resolver_.get()));
// Wait for the replica to be usable.
diff --git a/src/kudu/tablet/tablet_replica.cc
b/src/kudu/tablet/tablet_replica.cc
index e393dab8c..fedea2f2e 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -226,6 +226,7 @@ Status TabletReplica::Start(
shared_ptr<Messenger> messenger,
scoped_refptr<ResultTracker> result_tracker,
scoped_refptr<Log> log,
+ consensus::MultiRaftManager* multi_raft_manager,
ThreadPool* prepare_pool,
DnsResolver* resolver) {
DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
@@ -292,6 +293,7 @@ Status TabletReplica::Start(
bootstrap_info,
std::move(peer_proxy_factory),
log,
+ multi_raft_manager,
std::move(time_manager),
this,
metric_entity,
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index dd1ffb02f..95212978f 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -65,6 +65,7 @@ class TxnOpDispatcherITest_PreliminaryTasksTimeout_Test;
namespace consensus {
class ConsensusMetadataManager;
+class MultiRaftManager;
class OpStatusPB;
class TimeManager;
}
@@ -130,6 +131,7 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
std::shared_ptr<rpc::Messenger> messenger,
scoped_refptr<rpc::ResultTracker> result_tracker,
scoped_refptr<log::Log> log,
+ consensus::MultiRaftManager* multi_raft_manager,
ThreadPool* prepare_pool,
DnsResolver* resolver);
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc
b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 614741c07..115a98e23 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -187,6 +187,7 @@ class TabletCopyTest : public KuduTabletTest,
messenger,
scoped_refptr<rpc::ResultTracker>(),
log,
+ nullptr,
prepare_pool_.get(),
dns_resolver_.get()));
ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 8de6e6a6b..07d00ccf9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -28,6 +28,7 @@
#include <string>
#include <type_traits>
#include <unordered_set>
+#include <utility>
#include <vector>
#include <gflags/gflags.h>
@@ -207,6 +208,8 @@ using kudu::consensus::GetNodeInstanceResponsePB;
using kudu::consensus::LeaderStepDownMode;
using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::MultiRaftConsensusRequestPB;
+using kudu::consensus::MultiRaftConsensusResponsePB;
using kudu::consensus::OpId;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RaftPeerPB;
@@ -261,6 +264,25 @@ extern const char* CFILE_CACHE_MISS_BYTES_METRIC_NAME;
extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME;
}
+namespace {
+ consensus::ConsensusRequestPB ToSingleRequest(
+ const consensus::MultiRaftConsensusRequestPB& parent_req,
+ const consensus::BatchedNoOpConsensusRequestPB& req) {
+ consensus::ConsensusRequestPB result;
+
+ result.set_dest_uuid(parent_req.dest_uuid());
+ result.set_caller_uuid(parent_req.caller_uuid());
+ result.set_tablet_id(req.tablet_id());
+ result.set_caller_term(req.caller_term());
+ *result.mutable_preceding_id() = req.preceding_id();
+ result.set_committed_index(req.committed_index());
+ result.set_all_replicated_index(req.all_replicated_index());
+ result.set_safe_timestamp(req.safe_timestamp());
+ result.set_last_idx_appended_to_leader(req.last_idx_appended_to_leader());
+
+ return result;
+ }
+} // namespace
namespace tserver {
const char* SCANNER_BYTES_READ_METRIC_NAME = "scanner_bytes_read";
@@ -294,24 +316,31 @@ bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf*
tablet_manager,
return true;
}
-template<class RespClass>
-void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
- TabletStatePB tablet_state,
- RespClass* resp,
- RpcContext* context) {
+std::pair<Status, TabletServerErrorPB::Code> GetTabletNotRunningCode(
+ const scoped_refptr<TabletReplica>& replica,
+ const TabletStatePB& tablet_state
+ ) {
Status s = Status::IllegalState("Tablet not RUNNING",
tablet::TabletStatePB_Name(tablet_state));
- auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
if (replica->tablet_metadata()->tablet_data_state() ==
TABLET_DATA_TOMBSTONED ||
replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) {
// Treat tombstoned tablets as if they don't exist for most purposes.
// This takes precedence over failed, since we don't reset the failed
// status of a TabletReplica when deleting it. Only tablet copy does that.
- error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
+ return {s, TabletServerErrorPB::TABLET_NOT_FOUND};
} else if (tablet_state == tablet::FAILED) {
s = s.CloneAndAppend(replica->error().ToString());
- error_code = TabletServerErrorPB::TABLET_FAILED;
+ return {s, TabletServerErrorPB::TABLET_FAILED};
}
+ return {s, TabletServerErrorPB::TABLET_NOT_RUNNING};
+}
+
+template<class RespClass>
+void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
+ TabletStatePB tablet_state,
+ RespClass* resp,
+ RpcContext* context) {
+ auto [s, error_code] = GetTabletNotRunningCode(replica, tablet_state);
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
}
@@ -1777,6 +1806,67 @@ void ConsensusServiceImpl::UpdateConsensus(const
ConsensusRequestPB* req,
context->RespondSuccess();
}
+void ConsensusServiceImpl::MultiRaftUpdateConsensus(
+ const MultiRaftConsensusRequestPB* req,
+ MultiRaftConsensusResponsePB* resp,
+ RpcContext* context) {
+ DVLOG(3) << "Received Batched Consensus Update RPC: " <<
SecureDebugString(*req);
+ if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp,
context)) {
+ return;
+ }
+ resp->set_responder_uuid(tablet_manager_->NodeInstance().permanent_uuid());
+ for (const auto& single_req : req->consensus_requests()) {
+ auto* single_resp = resp->add_consensus_responses();
+ auto set_error = [single_resp](const Status& s, const
TabletServerErrorPB::Code& error_code) {
+ auto error = single_resp->mutable_error();
+ StatusToPB(s, error->mutable_status());
+ error->set_code(error_code);
+ };
+ scoped_refptr<TabletReplica> replica;
+ Status s = tablet_manager_->GetTabletReplica(single_req.tablet_id(),
&replica);
+ if (PREDICT_FALSE(!s.ok())) {
+ set_error(s, TabletServerErrorPB::TABLET_NOT_FOUND);
+ continue; // move to next request;
+ }
+
+ const auto& state = replica->state();
+ if (PREDICT_FALSE(state != tablet::RUNNING)) {
+ auto [s, error_code] = GetTabletNotRunningCode(replica, state);
+ set_error(s, error_code);
+ continue;
+ }
+ shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
+
+ if (!consensus) {
+ set_error(Status::ServiceUnavailable("Raft Consensus unavailable",
+ "Tablet replica not initialized"),
+ TabletServerErrorPB::TABLET_NOT_RUNNING);
+ continue;
+ }
+
+ const auto req2 = ToSingleRequest(*req, single_req);
+ auto resp2 = ConsensusResponsePB();
+ // TODO(martonka): We know that this is a no-op. So maybe we could handle
it
+ // in a more efficient way.
+ s = consensus->Update(&req2, &resp2);
+ if (PREDICT_FALSE(!s.ok())) {
+ // Clear the response first, since a partially-filled response could
+ // result in confusing a caller, or in having missing required fields
+ // in embedded optional messages.
+ single_resp->Clear();
+ set_error(s, TabletServerErrorPB::UNKNOWN_ERROR);
+ continue;
+ }
+ single_resp->set_responder_term(resp2.responder_term());
+ *single_resp->mutable_status() = resp2.status();
+ single_resp->set_server_quiescing(resp2.server_quiescing());
+ if (resp2.has_error()) {
+ *single_resp->mutable_error() = resp2.error();
+ }
+ }
+ context->RespondSuccess();
+}
+
void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
VoteResponsePB* resp,
RpcContext* context) {
diff --git a/src/kudu/tserver/tablet_service.h
b/src/kudu/tserver/tablet_service.h
index ba482b9bc..2765fd371 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -52,6 +52,8 @@ class ChangeConfigRequestPB;
class ChangeConfigResponsePB;
class ConsensusRequestPB;
class ConsensusResponsePB;
+class MultiRaftConsensusRequestPB;
+class MultiRaftConsensusResponsePB;
class GetConsensusStateRequestPB;
class GetConsensusStateResponsePB;
class GetLastOpIdRequestPB;
@@ -256,6 +258,11 @@ class ConsensusServiceImpl : public
consensus::ConsensusServiceIf {
consensus::ConsensusResponsePB* resp,
rpc::RpcContext* context) override;
+ void MultiRaftUpdateConsensus(
+ const consensus::MultiRaftConsensusRequestPB* req,
+ consensus::MultiRaftConsensusResponsePB* resp,
+ rpc::RpcContext* context) override;
+
void RequestConsensusVote(const consensus::VoteRequestPB* req,
consensus::VoteResponsePB* resp,
rpc::RpcContext* context) override;
diff --git a/src/kudu/tserver/ts_tablet_manager.cc
b/src/kudu/tserver/ts_tablet_manager.cc
index f7f3269c1..f4af2c99b 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -43,6 +43,7 @@
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/multi_raft_batcher.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
@@ -346,7 +347,10 @@ TSTabletManager::TSTabletManager(TabletServer* server)
shutdown_latch_(1),
metric_registry_(server->metric_registry()),
tablet_copy_metrics_(server->metric_entity()),
- state_(MANAGER_INITIALIZING) {
+ state_(MANAGER_INITIALIZING),
+ multi_raft_manager_(std::make_unique<consensus::MultiRaftManager>(
+ server_->dns_resolver(), server_->metric_entity())) {
+
// A heartbeat msg without statistics will be considered to be from an old
// version, thus it's necessary to trigger updating stats as soon as
possible.
next_update_time_ = MonoTime::Now();
@@ -448,6 +452,8 @@ Status TSTabletManager::Init(Timer* start_tablets,
std::atomic<int>* tablets_total) {
CHECK_EQ(state(), MANAGER_INITIALIZING);
+ multi_raft_manager_->Init(server_->messenger(), server_->raft_pool());
+
// Start the tablet copy thread pool. We set a max queue size of 0 so that if
// the number of requests exceeds the number of threads, a
// SERVICE_UNAVAILABLE error may be returned to the remote caller.
@@ -1433,6 +1439,7 @@ void TSTabletManager::OpenTablet(const
scoped_refptr<tablet::TabletReplica>& rep
server_->messenger(),
server_->result_tracker(),
log,
+ multi_raft_manager_.get(),
server_->tablet_prepare_pool(),
server_->dns_resolver());
if (!s.ok()) {
@@ -1535,6 +1542,10 @@ void TSTabletManager::Shutdown() {
txn_status_manager_pool_->Shutdown();
}
+ if (multi_raft_manager_ != nullptr) {
+ multi_raft_manager_->Shutdown();
+ }
+
// Take a snapshot of the replicas list -- that way we don't have to hold
// on to the lock while shutting them down, which might cause a lock
// inversion. (see KUDU-308 for example).
diff --git a/src/kudu/tserver/ts_tablet_manager.h
b/src/kudu/tserver/ts_tablet_manager.h
index 289ed0108..7597928b0 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -66,6 +66,7 @@ namespace consensus {
class ConsensusMetadataManager;
class OpId;
class StartTabletCopyRequestPB;
+class MultiRaftManager;
} // namespace consensus
namespace master {
@@ -504,6 +505,8 @@ class TSTabletManager : public
tserver::TabletReplicaLookupIf {
scoped_refptr<Histogram> create_tablet_run_time_;
scoped_refptr<Histogram> delete_tablet_run_time_;
+ std::unique_ptr<consensus::MultiRaftManager> multi_raft_manager_;
+
DISALLOW_COPY_AND_ASSIGN(TSTabletManager);
};