Signed-off-by: Ben Peart <[email protected]>
---
cache.h | 1 +
config.c | 5 +
environment.c | 1 +
unpack-trees.c | 313 ++++++++++++++++++++++++++++++++++++++++++++++++-
unpack-trees.h | 30 +++++
5 files changed, 348 insertions(+), 2 deletions(-)
diff --git a/cache.h b/cache.h
index d49092d94d..4bfa35c497 100644
--- a/cache.h
+++ b/cache.h
@@ -815,6 +815,7 @@ extern int fsync_object_files;
extern int core_preload_index;
extern int core_commit_graph;
extern int core_apply_sparse_checkout;
+extern int core_parallel_unpack_trees;
extern int precomposed_unicode;
extern int protect_hfs;
extern int protect_ntfs;
diff --git a/config.c b/config.c
index f4a208a166..34d5506588 100644
--- a/config.c
+++ b/config.c
@@ -1346,6 +1346,11 @@ static int git_default_core_config(const char *var,
const char *value)
var, value);
}
+ if (!strcmp(var, "core.parallelunpacktrees")) {
+ core_parallel_unpack_trees = git_config_bool(var, value);
+ return 0;
+ }
+
/* Add other config variables here and to Documentation/config.txt. */
return 0;
}
diff --git a/environment.c b/environment.c
index 2a6de2330b..1eb0a05074 100644
--- a/environment.c
+++ b/environment.c
@@ -68,6 +68,7 @@ char *notes_ref_name;
int grafts_replace_parents = 1;
int core_commit_graph;
int core_apply_sparse_checkout;
+int core_parallel_unpack_trees;
int merge_log_config = -1;
int precomposed_unicode = -1; /* see probe_utf8_pathname_composition() */
unsigned long pack_size_limit_cfg;
diff --git a/unpack-trees.c b/unpack-trees.c
index 1f58efc6bb..2333626efd 100644
--- a/unpack-trees.c
+++ b/unpack-trees.c
@@ -17,6 +17,7 @@
#include "submodule-config.h"
#include "fsmonitor.h"
#include "fetch-object.h"
+#include "thread-utils.h"
/*
* Error messages expected by scripts out of plumbing commands such as
@@ -641,6 +642,98 @@ static inline int are_same_oid(struct name_entry *name_j,
struct name_entry *nam
return name_j->oid && name_k->oid && !oidcmp(name_j->oid, name_k->oid);
}
+#ifndef NO_PTHREADS
+
+struct traverse_info_parallel {
+ struct mpmcq_entry entry;
+ struct tree_desc t[MAX_UNPACK_TREES];
+ void *buf[MAX_UNPACK_TREES];
+ struct traverse_info info;
+ int n;
+ int nr_buf;
+ int ret;
+};
+
+static int traverse_trees_parallel(int n, unsigned long dirmask,
+ unsigned long df_conflicts,
+ struct name_entry *names,
+ struct traverse_info *info)
+{
+ int i;
+ struct name_entry *p;
+ struct unpack_trees_options *o = info->data;
+ struct traverse_info_parallel *newinfo;
+
+ p = names;
+ while (!p->mode)
+ p++;
+
+ newinfo = xmalloc(sizeof(struct traverse_info_parallel));
+ mpmcq_entry_init(&newinfo->entry);
+ newinfo->info = *info;
+ newinfo->info.prev = info;
+ newinfo->info.pathspec = info->pathspec;
+ newinfo->info.name = *p;
+ newinfo->info.pathlen += tree_entry_len(p) + 1;
+ newinfo->info.df_conflicts |= df_conflicts;
+ newinfo->nr_buf = 0;
+ newinfo->n = n;
+
+ /*
+ * Fetch the tree from the ODB for each peer directory in the
+ * n commits.
+ *
+ * For 2- and 3-way traversals, we try to avoid hitting the
+ * ODB twice for the same OID. This should yield a nice speed
+ * up in checkouts and merges when the commits are similar.
+ *
+ * We don't bother doing the full O(n^2) search for larger n,
+ * because wider traversals don't happen that often and we
+ * avoid the search setup.
+ *
+ * When 2 peer OIDs are the same, we just copy the tree
+ * descriptor data. This implicitly borrows the buffer
+ * data from the earlier cell.
+ */
+ for (i = 0; i < n; i++, dirmask >>= 1) {
+ if (i > 0 && are_same_oid(&names[i], &names[i - 1]))
+ newinfo->t[i] = newinfo->t[i - 1];
+ else if (i > 1 && are_same_oid(&names[i], &names[i - 2]))
+ newinfo->t[i] = newinfo->t[i - 2];
+ else {
+ const struct object_id *oid = NULL;
+ if (dirmask & 1)
+ oid = names[i].oid;
+
+ /*
+ * fill_tree_descriptor() will load the tree from the
+ * ODB. Accessing the ODB is not thread safe so
+ * serialize access using the odb_mutex.
+ */
+ pthread_mutex_lock(&o->odb_mutex);
+ newinfo->buf[newinfo->nr_buf++] =
+ fill_tree_descriptor(newinfo->t + i, oid);
+ pthread_mutex_unlock(&o->odb_mutex);
+ }
+ }
+
+ /*
+ * We can't play games with the cache bottom as we are processing
+ * the tree objects in parallel.
+ * newinfo->bottom = switch_cache_bottom(&newinfo->info);
+ */
+
+ /* All I really need here is fetch_and_add() */
+ pthread_mutex_lock(&o->work_mutex);
+ o->remaining_work++;
+ pthread_mutex_unlock(&o->work_mutex);
+ mpmcq_push(&o->queue, &newinfo->entry);
+
+ return 0;
+}
+
+#endif
+
static int traverse_trees_recursive(int n, unsigned long dirmask,
unsigned long df_conflicts,
struct name_entry *names,
@@ -995,6 +1088,108 @@ static void debug_unpack_callback(int n,
debug_name_entry(i, names + i);
}
+static int unpack_callback_parallel(int n, unsigned long mask,
+ unsigned long dirmask,
+ struct name_entry *names,
+ struct traverse_info *info)
+{
+ struct cache_entry *src[MAX_UNPACK_TREES + 1] = {
+ NULL,
+ };
+ struct unpack_trees_options *o = info->data;
+ const struct name_entry *p = names;
+
+ /* Find first entry with a real name (we could use "mask" too) */
+ while (!p->mode)
+ p++;
+
+ if (o->debug_unpack)
+ debug_unpack_callback(n, mask, dirmask, names, info);
+
+ /* Are we supposed to look at the index too? */
+ if (o->merge) {
+ while (1) {
+ int cmp;
+ struct cache_entry *ce;
+
+ if (o->diff_index_cached)
+ ce = next_cache_entry(o);
+ else
+ ce = find_cache_entry(info, p);
+
+ if (!ce)
+ break;
+ cmp = compare_entry(ce, info, p);
+ if (cmp < 0) {
+ int ret;
+
+
pthread_mutex_lock(&o->unpack_index_entry_mutex);
+ ret = unpack_index_entry(ce, o);
+
pthread_mutex_unlock(&o->unpack_index_entry_mutex);
+ if (ret < 0)
+ return unpack_failed(o, NULL);
+ continue;
+ }
+ if (!cmp) {
+ if (ce_stage(ce)) {
+ /*
+ * If we skip unmerged index
+ * entries, we'll skip this
+ * entry *and* the tree
+ * entries associated with it!
+ */
+ if (o->skip_unmerged) {
+ add_same_unmerged(ce, o);
+ return mask;
+ }
+ }
+ src[0] = ce;
+ }
+ break;
+ }
+ }
+
+ pthread_mutex_lock(&o->unpack_nondirectories_mutex);
+ int ret = unpack_nondirectories(n, mask, dirmask, src, names, info);
+ pthread_mutex_unlock(&o->unpack_nondirectories_mutex);
+ if (ret < 0)
+ return -1;
+
+ if (o->merge && src[0]) {
+ if (ce_stage(src[0]))
+ mark_ce_used_same_name(src[0], o);
+ else
+ mark_ce_used(src[0], o);
+ }
+
+ /* Now handle any directories.. */
+ if (dirmask) {
+ /* special case: "diff-index --cached" looking at a tree */
+ if (o->diff_index_cached && n == 1 && dirmask == 1 &&
+ S_ISDIR(names->mode)) {
+ int matches;
+ matches = cache_tree_matches_traversal(
+ o->src_index->cache_tree, names, info);
+ /*
+ * Everything under the name matches; skip the
+ * entire hierarchy. diff_index_cached codepath
+ * special cases D/F conflicts in such a way that
+ * it does not do any look-ahead, so this is safe.
+ */
+ if (matches) {
+ o->cache_bottom += matches;
+ return mask;
+ }
+ }
+
+ if (traverse_trees_parallel(n, dirmask, mask & ~dirmask, names,
info) < 0)
+ return -1;
+ return mask;
+ }
+
+ return mask;
+}
+
static int unpack_callback(int n, unsigned long mask, unsigned long dirmask,
struct name_entry *names, struct traverse_info *info)
{
struct cache_entry *src[MAX_UNPACK_TREES + 1] = { NULL, };
@@ -1263,6 +1458,116 @@ static void mark_new_skip_worktree(struct exclude_list
*el,
static int verify_absent(const struct cache_entry *,
enum unpack_trees_error_types,
struct unpack_trees_options *);
+
+#ifndef NO_PTHREADS
+static void *traverse_trees_parallel_thread_proc(void *_data)
+{
+ struct unpack_trees_options *o = _data;
+ struct traverse_info_parallel *info;
+ int i;
+
+ while (1) {
+ info = (struct traverse_info_parallel *)mpmcq_pop(&o->queue);
+ if (!info)
+ break;
+
+ info->ret = traverse_trees(info->n, info->t, &info->info);
+ /*
+ * We can't play games with the cache bottom as we are
processing
+ * the tree objects in parallel.
+ * restore_cache_bottom(&info->info, info->bottom);
+ */
+
+ for (i = 0; i < info->nr_buf; i++)
+ free(info->buf[i]);
+ /*
+ * TODO: Can't free "info" when thread is done because it can
be used
+ * as ->prev link in child info objects. Ref count? Free all
at end?
+ free(info);
+ */
+
+ /* All I really need here is fetch_and_add() */
+ pthread_mutex_lock(&o->work_mutex);
+ o->remaining_work--;
+ if (o->remaining_work == 0)
+ mpmcq_cancel(&o->queue);
+ pthread_mutex_unlock(&o->work_mutex);
+ }
+
+ return NULL;
+}
+
+static void init_parallel_traverse(struct unpack_trees_options *o,
+ struct traverse_info *info)
+{
+ /*
+ * TODO: Add logic to bypass parallel path when not needed.
+ * - not enough CPU cores to help
+ * - 'git status' is always fast - how to detect?
+ * - small trees (may be able to use index size as
proxy, small index likely means small commit tree)
+ */
+ if (core_parallel_unpack_trees) {
+ int t;
+
+ mpmcq_init(&o->queue);
+ o->remaining_work = 0;
+ pthread_mutex_init(&o->unpack_nondirectories_mutex, NULL);
+ pthread_mutex_init(&o->unpack_index_entry_mutex, NULL);
+ pthread_mutex_init(&o->odb_mutex, NULL);
+ pthread_mutex_init(&o->work_mutex, NULL);
+ o->nr_threads = online_cpus();
+ o->pthreads = xcalloc(o->nr_threads, sizeof(pthread_t));
+ info->fn = unpack_callback_parallel;
+
+ for (t = 0; t < o->nr_threads; t++) {
+ if (pthread_create(&o->pthreads[t], NULL,
+ traverse_trees_parallel_thread_proc,
+ o))
+ die("unable to create
traverse_trees_parallel_thread");
+ }
+ }
+}
+
+static void wait_parallel_traverse(struct unpack_trees_options *o)
+{
+ /*
+ * The first tree (root directory) is processed on the main thread.
+ * This function is called after it has completed. If there is no
+ * remaining work, we know we are finished.
+ */
+ if (core_parallel_unpack_trees) {
+ int t;
+
+ pthread_mutex_lock(&o->work_mutex);
+ if (o->remaining_work == 0)
+ mpmcq_cancel(&o->queue);
+ pthread_mutex_unlock(&o->work_mutex);
+
+ for (t = 0; t < o->nr_threads; t++) {
+ if (pthread_join(o->pthreads[t], NULL))
+ die("unable to join
traverse_trees_parallel_thread");
+ }
+
+ free(o->pthreads);
+ pthread_mutex_destroy(&o->work_mutex);
+ pthread_mutex_destroy(&o->odb_mutex);
+ pthread_mutex_destroy(&o->unpack_index_entry_mutex);
+ pthread_mutex_destroy(&o->unpack_nondirectories_mutex);
+ mpmcq_destroy(&o->queue);
+ }
+}
+#else
+static void init_parallel_traverse(struct unpack_trees_options *o)
+{
+ return;
+}
+
+static void wait_parallel_traverse(struct unpack_trees_options *o)
+{
+ return;
+}
+#endif
+
/*
* N-way merge "len" trees. Returns 0 on success, -1 on failure to manipulate
the
* resulting index, -2 on failure to reflect the changes to the work tree.
@@ -1327,6 +1632,7 @@ int unpack_trees(unsigned len, struct tree_desc *t,
struct unpack_trees_options
const char *prefix = o->prefix ? o->prefix : "";
struct traverse_info info;
uint64_t start;
+ int ret;
setup_traverse_info(&info, prefix);
info.fn = unpack_callback;
@@ -1352,9 +1658,12 @@ int unpack_trees(unsigned len, struct tree_desc *t,
struct unpack_trees_options
}
start = getnanotime();
- if (traverse_trees(len, t, &info) < 0)
- goto return_failed;
+ init_parallel_traverse(o, &info);
+ ret = traverse_trees(len, t, &info);
+ wait_parallel_traverse(o);
trace_performance_since(start, "traverse_trees");
+ if (ret < 0)
+ goto return_failed;
}
/* Any left-over entries in the index? */
diff --git a/unpack-trees.h b/unpack-trees.h
index c2b434c606..b7140099fa 100644
--- a/unpack-trees.h
+++ b/unpack-trees.h
@@ -3,6 +3,11 @@
#include "tree-walk.h"
#include "argv-array.h"
+#ifndef NO_PTHREADS
+#include "git-compat-util.h"
+#include <pthread.h>
+#include "mpmcqueue.h"
+#endif
#define MAX_UNPACK_TREES 8
@@ -80,6 +85,31 @@ struct unpack_trees_options {
struct index_state result;
struct exclude_list *el; /* for internal use */
+#ifndef NO_PTHREADS
+ /*
+ * Speed up the tree traversal by adding all discovered tree objects
+ * into a queue and have a pool of worker threads process them in
+ * parallel. Since there is no upper bound on the size of a tree and
+ * each worker thread will be adding discovered tree objects to the
+ * queue, we need an unbounded multi-producer-multi-consumer queue.
+ */
+ struct mpmcq queue;
+
+ int nr_threads;
+ pthread_t *pthreads;
+
+ /* need a mutex as we don't have fetch_and_add() */
+ int remaining_work;
+ pthread_mutex_t work_mutex;
+
+ /* The ODB is not thread safe so we must serialize access to it */
+ pthread_mutex_t odb_mutex;
+
+ /* various functions that are not thread safe and must be serialized
for now */
+ pthread_mutex_t unpack_index_entry_mutex;
+ pthread_mutex_t unpack_nondirectories_mutex;
+
+#endif
};
extern int unpack_trees(unsigned n, struct tree_desc *t,
--
2.17.0.gvfs.1.123.g449c066