On Sat, Apr 16, 2022 at 06:16:23PM -0600, David Fifield wrote: > I am trying to reproduce the "frac" computation from the Reproducible > Metrics instructions: > https://metrics.torproject.org/reproducible-metrics.html#relay-users > Which is also Section 3 in the tech report on counting bridge users: > https://research.torproject.org/techreports/counting-daily-bridge-users-2012-10-24.pdf#page=4 > > h(R^H) * n(H) + h(H) * n(R\H) > frac = ----------------------------- > h(H) * n(N) > > My minor goal is to reproduce the "frac" column from the Metrics web > site (which I assume is the same as the frac above, expressed as a > percentage): > > https://metrics.torproject.org/userstats-relay-country.csv?start=2022-04-01&end=2022-04-08&country=all&events=off > date,country,users,lower,upper,frac > 2022-04-01,,2262557,,,92 > 2022-04-02,,2181639,,,92 > 2022-04-03,,2179544,,,93 > 2022-04-04,,2350360,,,93 > 2022-04-05,,2388772,,,93 > 2022-04-06,,2356170,,,93 > 2022-04-07,,2323184,,,93 > 2022-04-08,,2310170,,,91 > > I'm having trouble with the computation of n(R\H) and h(R∧H). I > understand that R is the subset of relays that report directory request > counts (i.e. that have dirreq-stats-end in their extra-info descriptors) > and H is the subset of relays that report directory request byte counts > (i.e. that have dirreq-write-history in their extra-info descriptors). > R and H partially overlap: there are relays that are in R but not H, > others that are in H but not R, and others that are in both. > > The computations depend on some values that are directly from > descriptors: > n(R) = sum of hours, for relays with directory request counts > n(H) = sum of hours, for relays with directory write histories > h(H) = sum of written bytes, for relays with directory write histories > > ... > > Using the formulas and assumptions above, here's my attempt at computing > recent "frac" values: > > date `n(N)` `n(H)` `h(H)` `n(R)` `n(R\H)` `h(R∧H)` frac > 2022-04-01 166584 177638. 2.24e13 125491. 0 1.59e13 0.753 > 2022-04-02 166951 177466. 2.18e13 125686. 0 1.54e13 0.753 > 2022-04-03 167100 177718. 2.27e13 127008. 0 1.62e13 0.760 > 2022-04-04 166970 177559. 2.43e13 126412. 0 1.73e13 0.757 > 2022-04-05 166729 177585. 2.44e13 125389. 0 1.72e13 0.752 > 2022-04-06 166832 177470. 2.39e13 127077. 0 1.71e13 0.762 > 2022-04-07 166532 177210. 2.48e13 127815. 0 1.79e13 0.768 > 2022-04-08 167695 176879. 2.52e13 127697. 0 1.82e13 0.761
I tried computing n(R\H) and h(R∧H) from the definitions, rather than by using the formulas in the Reproducible Metrics guide. This achieves an almost matching "frac" column, though it is still about 1% too high. date `n(N)` `n(H)` `h(H)` `n(R)` `n(R\H)` `h(R∧H)` frac 2022-04-01 166584 177638. 2.24e13 125491. 90.9 1.96e13 0.930 2022-04-02 166951 177466. 2.18e13 125686. 181. 1.92e13 0.937 2022-04-03 167100 177718. 2.27e13 127008. 154. 2.00e13 0.942 2022-04-04 166970 177559. 2.43e13 126412. 134. 2.14e13 0.936 2022-04-05 166729 177585. 2.44e13 125389. 94.6 2.15e13 0.938 2022-04-06 166832 177470. 2.39e13 127077. 162. 2.11e13 0.940 2022-04-07 166532 177210. 2.48e13 127815. 102. 2.18e13 0.938 2022-04-08 167695 176879. 2.52e13 127697. 158. 2.21e13 0.926 I got this by taking an explicit set intersection between the R and H time intervals. So, for example, if the intervals making up n(R) and n(H) are (with their lengths): n(R) [---10---] [----12----] [---9---] n(H) [----12----] [------16------] [--7--] Then the intersection n(R∧H) is: n(R∧H) [-5-] [-5-] [3] [3] h(R∧H) comes pro-rating the n(H) intervals, each of which is associated with an h(H) byte count). Suppose the [----12----] interval represents 1000 bytes. Then each of the [-5-] intervals that result from it in the intersection are worth 5/12 × 1000 = 417 bytes. We get n(R\H) from n(R) − n(R∧H): n(R\H) [-5-] [4-] [-6--] This seems overall more correct, though it required a more elaborate computation than the Reproducible Metrics guide prescribes. I'm still not sure why it does not match exactly, and I would still appreciate a pointer to where Tor Metrics does the "frac" computation. I was initially interested in this for the purpose of better estimating the number of Snowflake users. But now I've decided "frac" is not useful for that purpose: since there is only one bridge we care about, it does not make sense to adjust the numbers to account for other bridges that may not report the same set of statistics. I don't plan to take this investigation any further for the time being, but here is source code to reproduce the above tables. You will need: https://collector.torproject.org/archive/relay-descriptors/consensuses/consensuses-2022-04.tar.xz https://collector.torproject.org/archive/relay-descriptors/extra-infos/extra-infos-2022-04.tar.xz ./relay_uptime.py consensuses-2022-04.tar.xz > relay_uptime.csv ./relay_dir.py extra-infos-2022-04.tar.xz > relay_dir.csv ./frac.py relay_uptime.csv relay_dir.csv
#!/usr/bin/env python3 import getopt import multiprocessing import sys import stem import stem.descriptor import stem.descriptor.reader import stem.descriptor.networkstatus import numpy as np import pandas as pd import common def process_network_status(network_status): assert type(network_status) == stem.descriptor.networkstatus.NetworkStatusDocumentV3, type(network_status) data = { "date": [], "relay_uptime_hours": [], } # We assume the intervals seen by this function are non-overlapping. num_running = sum(stem.Flag.RUNNING in router.flags for router in network_status.routers.values()) for (date, frac_int, _) in common.segment_datetime_interval(network_status.valid_after, network_status.fresh_until): data["date"].append(date) data["relay_uptime_hours"].append(num_running * frac_int) return pd.DataFrame(data) def process_file(f): with stem.descriptor.reader.DescriptorReader([f], document_handler = stem.descriptor.DocumentHandler.DOCUMENT) as reader: return ( pd.concat(process_network_status(desc) for desc in reader) .groupby("date").sum().reset_index() ) if __name__ == "__main__": _, inputs = getopt.gnu_getopt(sys.argv[1:], "") with multiprocessing.Pool(common.NUM_PROCESSES) as pool: ( pd.concat(pool.imap_unordered(process_file, inputs)) .groupby("date").sum().reset_index() ).to_csv(sys.stdout, index = False, float_format = "%.2f", columns = [ "date", "relay_uptime_hours", ])
date,relay_uptime_hours 2022-04-01,166584.00 2022-04-02,166951.00 2022-04-03,167100.00 2022-04-04,166970.00 2022-04-05,166729.00 2022-04-06,166832.00 2022-04-07,166532.00 2022-04-08,167695.00 2022-04-09,167592.00 2022-04-10,167801.00 2022-04-11,167098.00 2022-04-12,166777.00 2022-04-13,166411.00 2022-04-14,27594.00
#!/usr/bin/env python3 import datetime import getopt import multiprocessing import sys import stem import stem.descriptor import stem.descriptor.reader import stem.descriptor.networkstatus import stem.descriptor.extrainfo_descriptor import numpy as np import pandas as pd import common def intersect_intervals(a, b): a = list(sorted(a)) b = list(sorted(b)) result = [] i = 0 j = 0 while i < len(a) and j < len(b): if a[i][0] < b[j][1] and a[i][1] > b[j][0]: result.append((max(a[i][0], b[j][0]), min(a[i][1], b[j][1]), i, j)) # Advance whichever sequence of intervals currently has the leftmost # right edge. if a[i][1] < b[j][1]: i += 1 else: j += 1 return result def process_relay_extra_infos(reader): dir_write_history = { "published": [], "fingerprint": [], "nickname": [], "begin": [], "end": [], "bytes": [], } dir_stats = { "published": [], "fingerprint": [], "nickname": [], "begin": [], "end": [], "resp_ok": [], } for desc in reader: assert type(desc) == stem.descriptor.extrainfo_descriptor.RelayExtraInfoDescriptor, type(desc) if desc.dir_write_history_end is not None \ and desc.published - desc.dir_write_history_end < common.END_THRESHOLD \ and datetime.timedelta(seconds = desc.dir_write_history_interval) < common.INTERVAL_THRESHOLD: # Break the write history into separate rows, one for each interval. end = desc.dir_write_history_end for value in reversed(desc.dir_write_history_values): begin = end - datetime.timedelta(seconds = desc.dir_write_history_interval) dir_write_history["published"].append(desc.published) dir_write_history["fingerprint"].append(desc.fingerprint) dir_write_history["nickname"].append(desc.nickname) dir_write_history["begin"].append(begin) dir_write_history["end"].append(end) dir_write_history["bytes"].append(value) end = begin if desc.dir_stats_end is not None \ and desc.published - desc.dir_stats_end < common.END_THRESHOLD \ and datetime.timedelta(seconds = desc.dir_stats_interval) < common.INTERVAL_THRESHOLD: resp_ok = desc.dir_v3_responses[stem.descriptor.extrainfo_descriptor.DirResponse.OK] - 4 if resp_ok > 0: dir_stats["published"].append(desc.published) dir_stats["fingerprint"].append(desc.fingerprint) dir_stats["nickname"].append(desc.nickname) dir_stats["begin"].append(desc.dir_stats_end - datetime.timedelta(seconds = desc.dir_stats_interval)) dir_stats["end"].append(desc.dir_stats_end) dir_stats["resp_ok"].append(resp_ok) # Different descriptors for the same relay contain overlapping write # histories. Keep only the most recent "published" for each "end". dir_write_history = ( pd.DataFrame(dir_write_history) .sort_values("published") .groupby(["fingerprint", "nickname", "end"]) .last() .reset_index() ) # Do the same for directory responses, though we don't expect these to # overlap. dir_stats = ( pd.DataFrame(dir_stats) .sort_values("published") .groupby(["fingerprint", "nickname", "end"]) .last() .reset_index() ) # Now compute the intervals, for each relay, which are covered by *both* # dir_write_history and dir_stats. both = [] dir_write_history_grouped = dir_write_history.groupby(["fingerprint", "nickname"]) dir_stats_grouped = dir_stats.groupby(["fingerprint", "nickname"]) for (fingerprint, nickname), dir_write_history_group in dir_write_history_grouped: try: dir_stats_group = dir_stats_grouped.get_group((fingerprint, nickname)) except KeyError: continue # Find the intersection, H∧R, of write history intervals and dir stats # intervals. dir_write_history_intervals = [(row.begin, row.end) for row in dir_write_history_group.itertuples()] dir_stats_intervals = [(row.begin, row.end) for row in dir_stats_group.itertuples()] intersection = intersect_intervals(dir_write_history_intervals, dir_stats_intervals) if not intersection: continue # Each tuple returned by intersect_intervals contains: # [0]: beginning of interval in intersection # [1]: end of interval in intersection # [2]: index in dir_write_history_intervals that contributes to this interval # [3]: index in dir_stats_intervals that contributes to this interval # We make a joint dataframe that maps the intersection intervals # (ibegin = [0], iend = [1]) to their [2] corresponding intervals in # dir_write_history_intervals, along with their byte counts. We use this # to scale the byte counts for the intersection intervals. joint = pd.concat([ pd.DataFrame({ "ibegin": [x[0] for x in intersection], "iend": [x[1] for x in intersection], }), dir_write_history_group.iloc[[x[2] for x in intersection]][["begin", "end", "bytes"]].reset_index(drop = True), ], axis = 1) both.append(pd.DataFrame({ "fingerprint": fingerprint, "nickname": nickname, "begin": joint["ibegin"], "end": joint["iend"], "bytes": joint["bytes"] * (pd.TimedeltaIndex(joint["iend"] - joint["ibegin"]).to_pytimedelta() / pd.TimedeltaIndex(joint["end"] - joint["begin"]).to_pytimedelta()), })) both = pd.concat(both) # Sum by date over all relays. dir_write_history_bydate = { "date": [], "relay_dir_write_hours": [], "relay_dir_write_bytes": [], } dir_stats_bydate = { "date": [], "relay_dir_stats_hours": [], "relay_dir_stats_resp_ok": [], } both_bydate = { "date": [], "both_hours": [], "both_bytes": [], } for row in dir_write_history.itertuples(): for (date, frac_int, _) in common.segment_datetime_interval(row.begin, row.end): dir_write_history_bydate["date"].append(date) dir_write_history_bydate["relay_dir_write_hours"].append((row.end - row.begin) / datetime.timedelta(hours = 1) * frac_int) dir_write_history_bydate["relay_dir_write_bytes"].append(row.bytes * frac_int) for row in dir_stats.itertuples(): for (date, frac_int, _) in common.segment_datetime_interval(row.begin, row.end): dir_stats_bydate["date"].append(date) dir_stats_bydate["relay_dir_stats_hours"].append((row.end - row.begin) / datetime.timedelta(hours = 1) * frac_int) dir_stats_bydate["relay_dir_stats_resp_ok"].append(row.resp_ok * frac_int) for row in both.itertuples(): for (date, frac_int, _) in common.segment_datetime_interval(row.begin, row.end): both_bydate["date"].append(date) both_bydate["both_hours"].append((row.end - row.begin) / datetime.timedelta(hours = 1) * frac_int) both_bydate["both_bytes"].append(row.bytes * frac_int) dir_write_history_bydate = ( pd.DataFrame(dir_write_history_bydate) .groupby("date").sum().reset_index() ) dir_stats_bydate = ( pd.DataFrame(dir_stats_bydate) .groupby("date").sum().reset_index() ) both_bydate = ( pd.DataFrame(both_bydate) .groupby("date").sum().reset_index() ) return pd.merge( pd.merge(dir_write_history_bydate, dir_stats_bydate, on = ["date"], how = "outer"), both_bydate, on = ["date"], how = "outer", ) def process_file(f): with stem.descriptor.reader.DescriptorReader([f]) as reader: return process_relay_extra_infos(reader) if __name__ == "__main__": _, inputs = getopt.gnu_getopt(sys.argv[1:], "") with multiprocessing.Pool(common.NUM_PROCESSES) as pool: ( pd.concat(pool.imap_unordered(process_file, inputs)) .groupby("date").sum().reset_index() ).to_csv(sys.stdout, index = False, float_format = "%.2f", columns = [ "date", "relay_dir_write_hours", "relay_dir_write_bytes", "relay_dir_stats_hours", "relay_dir_stats_resp_ok", "both_hours", "both_bytes", ])
date,relay_dir_write_hours,relay_dir_write_bytes,relay_dir_stats_hours,relay_dir_stats_resp_ok,both_hours,both_bytes 2022-03-20,82.91,1736713782.47,0.00,0.00,0.00,0.00 2022-03-21,421.90,6437321840.55,0.00,0.00,0.00,0.00 2022-03-22,970.53,8279352647.53,0.00,0.00,0.00,0.00 2022-03-23,1579.50,13942016383.53,0.00,0.00,0.00,0.00 2022-03-24,2558.18,16947613780.65,0.00,0.00,0.00,0.00 2022-03-25,5265.61,31445067926.07,0.00,0.00,0.00,0.00 2022-03-26,49073.66,5239496653355.36,0.00,0.00,0.00,0.00 2022-03-27,158442.15,20979790952547.85,0.00,0.00,0.00,0.00 2022-03-28,172902.40,23790345564006.61,0.00,0.00,0.00,0.00 2022-03-29,175311.71,23872774104777.91,0.00,0.00,0.00,0.00 2022-03-30,176491.41,24071621689387.25,27273.67,5126398.60,27235.07,5104977982724.75 2022-03-31,177534.60,23529414875973.20,109920.94,19643266.92,109835.57,18734199905639.44 2022-04-01,177638.09,22439702932089.59,125491.45,21001365.60,125400.58,19561513440430.52 2022-04-02,177466.08,21760791688019.58,125685.84,20363465.06,125504.89,19162123059384.01 2022-04-03,177717.51,22650212443851.81,127008.39,20455819.55,126854.33,20044713521642.67 2022-04-04,177559.24,24329589093181.60,126412.11,21929666.19,126277.69,21407202944760.75 2022-04-05,177585.25,24395314853928.89,125388.58,22312434.77,125294.95,21462939747109.41 2022-04-06,177470.40,23918457487092.34,127077.40,22067464.65,126915.31,21102579463539.30 2022-04-07,177210.36,24768051969233.11,127814.89,21760745.43,127713.06,21811098715725.13 2022-04-08,176879.16,25187290225432.38,127697.28,21309827.80,127539.29,22087431800503.34 2022-04-09,176262.96,23593325365455.48,126498.62,20655939.77,126337.55,20627973646625.13 2022-04-10,175357.06,22622047200557.93,125273.01,20220477.13,125078.36,20043119503663.02 2022-04-11,173470.74,25317282391208.27,125630.57,21293321.28,125232.90,22026316726563.52 2022-04-12,162104.32,24604704321104.66,121130.54,20856377.21,116487.69,20913938468593.55 2022-04-13,64924.25,9802425975580.58,50547.50,8701435.60,33860.77,6183056411127.21 2022-04-14,192.04,30127318662.96,266.81,39914.44,49.49,3116201356.98
#!/usr/bin/env python3 import getopt import sys import numpy as np import pandas as pd if __name__ == "__main__": _, (relay_uptime_csv_filename, relay_dir_csv_filename) = getopt.gnu_getopt(sys.argv[1:], "") relay_uptime = pd.read_csv(relay_uptime_csv_filename) relay_dir = pd.read_csv(relay_dir_csv_filename) j = ( pd.merge(relay_uptime, relay_dir, on = "date", how = "inner") .rename(columns = { "relay_uptime_hours": "n(N)", "relay_dir_write_hours": "n(H)", "relay_dir_write_bytes": "h(H)", "relay_dir_stats_hours": "n(R)", "both_hours": "n(R∧H)", "both_bytes": "h(R∧H)", }) ) j["n(R\\H)"] = j["n(R)"] - j["n(R∧H)"] # Uncomment these to use the formulas for n(R\H) and h(R∧H) from # https://metrics.torproject.org/reproducible-metrics.html#relay-users # j["n(R\\H)"] = np.maximum(0, j["n(R)"] - j["n(H)"]) # j["h(R∧H)"] = np.minimum(j["n(R)"], j["n(H)"]) / np.maximum(j["n(R)"], j["n(H)"]) * j["h(H)"] j["frac"] = (j["h(R∧H)"] * j["n(H)"] + j["h(H)"] * j["n(R\\H)"]) / (j["h(H)"] * j["n(N)"]) print(j[[ "date", "n(N)", "n(H)", "h(H)", "n(R)", "n(R∧H)", "h(R∧H)", "n(R\H)", "frac", ]])
_______________________________________________ tor-dev mailing list tor-dev@lists.torproject.org https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-dev