Author: stefan2
Date: Fri Jan 25 11:15:05 2013
New Revision: 1438453
URL: http://svn.apache.org/viewvc?rev=1438453&view=rev
Log:
On the fsfs-format7 branch: Implement layout optimizing fsfs packing
for format7 repos. Code works but still needs documentation and cleanup.
* subversion/libsvn_fs_fs/pack.c
(rep_info_t,
pack_context_t): new data types
(initialize_pack_context,
reset_pack_context,
close_pack_context,
copy_file_data,
copy_item_to_temp,
get_item_array_index,
add_item_rep_mapping,
copy_rep_to_temp,
compare_dir_entries,
copy_node_to_temp,
compare_p2l_info,
sort_items,
compare_p2l_info_rev,
sort_by_rev,
pick_recursively,
sort_reps,
copy_items_from_temp,
append_entries,
write_l2p_index,
pack_section,
append_revision): various new utility functions
(pack_log_addressed): new main pack function for format7 repos
(pack_phys_addressed): new main pack function for format6 repos
factored out from pack_rev_shared
(copy_indexes): drop obsolete function
(pack_rev_shard): simplify; call the suitable pack sub-function
(pack_shard): provide extra parameter
Modified:
subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c
Modified: subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c
URL:
http://svn.apache.org/viewvc/subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c?rev=1438453&r1=1438452&r2=1438453&view=diff
==============================================================================
--- subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c (original)
+++ subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c Fri Jan 25
11:15:05 2013
@@ -19,8 +19,11 @@
* under the License.
* ====================================================================
*/
+#include <assert.h>
+
#include "svn_pools.h"
#include "svn_dirent_uri.h"
+#include "svn_sorts.h"
#include "private/svn_temp_serializer.h"
#include "fs_fs.h"
@@ -29,12 +32,856 @@
#include "revprops.h"
#include "transaction.h"
#include "index.h"
+#include "low_level.h"
+#include "cached_data.h"
#include "../libsvn_fs/fs-loader.h"
#include "svn_private_config.h"
#include "temp_serializer.h"
+typedef struct rep_info_t
+{
+ struct svn_fs_fs__p2l_entry_t *entry;
+ struct rep_info_t *base;
+ struct rep_info_t *next;
+} rep_info_t;
+
+typedef struct pack_context_t
+{
+ svn_fs_t *fs;
+ svn_cancel_func_t cancel_func;
+ void *cancel_baton;
+
+ svn_revnum_t shard_rev;
+ svn_revnum_t start_rev;
+ svn_revnum_t end_rev;
+ svn_revnum_t shard_end_rev;
+
+ apr_file_t *proto_l2p_index;
+ apr_file_t *proto_p2l_index;
+
+ const char *shard_dir;
+ const char *pack_file_dir;
+ const char *pack_file_path;
+ apr_off_t pack_offset;
+ apr_file_t *pack_file;
+
+ apr_array_header_t *changes;
+ apr_file_t *changes_file;
+ apr_array_header_t *file_props;
+ apr_file_t *file_props_file;
+ apr_array_header_t *dir_props;
+ apr_file_t *dir_props_file;
+
+ apr_array_header_t *rev_offsets;
+ apr_array_header_t *reps_infos;
+ apr_array_header_t *reps;
+ apr_file_t *reps_file;
+
+ apr_pool_t *info_pool;
+} pack_context_t;
+
+static svn_error_t *
+initialize_pack_context(pack_context_t *context,
+ svn_fs_t *fs,
+ const char *pack_file_dir,
+ const char *shard_dir,
+ svn_revnum_t shard_rev,
+ apr_size_t max_items,
+ svn_cancel_func_t cancel_func,
+ void *cancel_baton,
+ apr_pool_t *pool)
+{
+ fs_fs_data_t *ffd = fs->fsap_data;
+ const char *temp_dir;
+ apr_size_t max_revs = MIN(ffd->max_files_per_dir, (int)max_items);
+
+ SVN_ERR_ASSERT(ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT);
+ SVN_ERR_ASSERT(shard_rev % ffd->max_files_per_dir == 0);
+
+ SVN_ERR(svn_io_temp_dir(&temp_dir, pool));
+
+ context->fs = fs;
+ context->cancel_func = cancel_func;
+ context->cancel_baton = cancel_baton;
+
+ context->shard_rev = shard_rev;
+ context->start_rev = shard_rev;
+ context->end_rev = shard_rev;
+ context->shard_end_rev = shard_rev + ffd->max_files_per_dir;
+
+ /* Create the new directory and pack file. */
+ context->shard_dir = shard_dir;
+ context->pack_file_dir = pack_file_dir;
+ context->pack_file_path
+ = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
+ SVN_ERR(svn_io_file_open(&context->pack_file, context->pack_file_path,
+ APR_WRITE | APR_BUFFERED | APR_BINARY | APR_EXCL
+ | APR_CREATE, APR_OS_DEFAULT, pool));
+
+ /* Index information files */
+ SVN_ERR(svn_fs_fs__l2p_proto_index_open
+ (&context->proto_l2p_index,
+ svn_dirent_join(pack_file_dir,
+ PATH_INDEX PATH_EXT_L2P_INDEX,
+ pool),
+ pool));
+ SVN_ERR(svn_fs_fs__p2l_proto_index_open
+ (&context->proto_p2l_index,
+ svn_dirent_join(pack_file_dir,
+ PATH_INDEX PATH_EXT_P2L_INDEX,
+ pool),
+ pool));
+
+ context->changes = apr_array_make(pool, max_items,
+ sizeof(svn_fs_fs__p2l_entry_t *));
+ SVN_ERR(svn_io_open_unique_file3(&context->changes_file, NULL, temp_dir,
+ svn_io_file_del_on_close, pool, pool));
+ context->file_props = apr_array_make(pool, max_items,
+ sizeof(svn_fs_fs__p2l_entry_t *));
+ SVN_ERR(svn_io_open_unique_file3(&context->file_props_file, NULL, temp_dir,
+ svn_io_file_del_on_close, pool, pool));
+ context->dir_props = apr_array_make(pool, max_items,
+ sizeof(svn_fs_fs__p2l_entry_t *));
+ SVN_ERR(svn_io_open_unique_file3(&context->dir_props_file, NULL, temp_dir,
+ svn_io_file_del_on_close, pool, pool));
+
+ context->rev_offsets = apr_array_make(pool, max_revs, sizeof(int));
+ context->reps_infos = apr_array_make(pool, max_items, sizeof(rep_info_t *));
+ context->reps = apr_array_make(pool, max_items,
+ sizeof(svn_fs_fs__p2l_entry_t *));
+ SVN_ERR(svn_io_open_unique_file3(&context->reps_file, NULL, temp_dir,
+ svn_io_file_del_on_close, pool, pool));
+
+ context->info_pool = svn_pool_create(pool);
+
+ return SVN_NO_ERROR;
+};
+
+static svn_error_t *
+reset_pack_context(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ apr_array_clear(context->changes);
+ SVN_ERR(svn_io_file_trunc(context->changes_file, 0, pool));
+ apr_array_clear(context->file_props);
+ SVN_ERR(svn_io_file_trunc(context->file_props_file, 0, pool));
+ apr_array_clear(context->dir_props);
+ SVN_ERR(svn_io_file_trunc(context->dir_props_file, 0, pool));
+
+ apr_array_clear(context->rev_offsets);
+ apr_array_clear(context->reps_infos);
+ apr_array_clear(context->reps);
+ SVN_ERR(svn_io_file_trunc(context->reps_file, 0, pool));
+
+ svn_pool_clear(context->info_pool);
+
+ return SVN_NO_ERROR;
+};
+
+static svn_error_t *
+close_pack_context(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ const char *l2p_index_path
+ = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_L2P_INDEX, NULL);
+ const char *p2l_index_path
+ = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_P2L_INDEX, NULL);
+ const char *proto_l2p_index_path;
+ const char *proto_p2l_index_path;
+
+ SVN_ERR(svn_io_file_name_get(&proto_l2p_index_path,
+ context->proto_l2p_index, pool));
+ SVN_ERR(svn_io_file_name_get(&proto_p2l_index_path,
+ context->proto_p2l_index, pool));
+
+ /* finalize proto index files */
+ SVN_ERR(svn_io_file_close(context->proto_l2p_index, pool));
+ SVN_ERR(svn_io_file_close(context->proto_p2l_index, pool));
+
+ /* Create the actual index files*/
+ SVN_ERR(svn_fs_fs__l2p_index_create(context->fs, l2p_index_path,
+ proto_l2p_index_path,
+ context->shard_rev, pool));
+ SVN_ERR(svn_fs_fs__p2l_index_create(context->fs, p2l_index_path,
+ proto_p2l_index_path,
+ context->shard_rev, pool));
+
+ /* remove proto index files */
+ SVN_ERR(svn_io_remove_file2(proto_l2p_index_path, FALSE, pool));
+ SVN_ERR(svn_io_remove_file2(proto_p2l_index_path, FALSE, pool));
+
+ SVN_ERR(svn_io_file_close(context->pack_file, pool));
+
+ return SVN_NO_ERROR;
+};
+
+static svn_error_t *
+copy_file_data(pack_context_t *context,
+ apr_file_t *dest,
+ apr_file_t *source,
+ apr_off_t size,
+ apr_pool_t *pool)
+{
+ /* most non-representation items will be small. Minimize the buffer
+ * and infrastructure overhead in that case. */
+ enum { STACK_BUFFER_SIZE = 1024 };
+
+ if (size < STACK_BUFFER_SIZE)
+ {
+ /* copy small data using a fixed-size buffer on stack */
+ char buffer[STACK_BUFFER_SIZE];
+ SVN_ERR(svn_io_file_read_full2(source, buffer, (apr_size_t)size,
+ NULL, NULL, pool));
+ SVN_ERR(svn_io_file_write_full(dest, buffer, (apr_size_t)size,
+ NULL, pool));
+ }
+ else
+ {
+ /* using streaming copies for larger data blocks. That may require
+ * the allocation of larger buffers and we should make sure that
+ * this extra memory is released asap. */
+ fs_fs_data_t *ffd = context->fs->fsap_data;
+ apr_pool_t *copypool = svn_pool_create(pool);
+ char *buffer = apr_palloc(copypool, ffd->block_size);
+
+ while (size)
+ {
+ apr_size_t to_copy = (apr_size_t)(MIN(size, ffd->block_size));
+ if (context->cancel_func)
+ SVN_ERR(context->cancel_func(context->cancel_baton));
+
+ SVN_ERR(svn_io_file_read_full2(source, buffer, to_copy,
+ NULL, NULL, pool));
+ SVN_ERR(svn_io_file_write_full(dest, buffer, to_copy,
+ NULL, pool));
+
+ size -= to_copy;
+ }
+
+ svn_pool_destroy(copypool);
+ }
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+copy_item_to_temp(pack_context_t *context,
+ apr_array_header_t *entries,
+ apr_file_t *temp_file,
+ apr_file_t *rev_file,
+ svn_fs_fs__p2l_entry_t *entry,
+ apr_pool_t *pool)
+{
+ svn_fs_fs__p2l_entry_t *new_entry = apr_palloc(context->info_pool,
+ sizeof(*new_entry));
+ *new_entry = *entry;
+ new_entry->offset = 0;
+ SVN_ERR(svn_io_file_seek(temp_file, SEEK_CUR, &new_entry->offset, pool));
+ APR_ARRAY_PUSH(entries, svn_fs_fs__p2l_entry_t *) = new_entry;
+
+ SVN_ERR(copy_file_data(context, temp_file, rev_file, entry->size, pool));
+
+ return SVN_NO_ERROR;
+}
+
+static int
+get_item_array_index(pack_context_t *context,
+ svn_revnum_t revision,
+ apr_int64_t item_index)
+{
+ assert(revision >= context->start_rev);
+ return (int)item_index + APR_ARRAY_IDX(context->rev_offsets,
+ revision - context->start_rev,
+ int);
+}
+
+static void
+add_item_rep_mapping(pack_context_t *context,
+ rep_info_t *info)
+{
+ int idx = get_item_array_index(context,
+ info->entry->revision,
+ info->entry->item_index);
+
+ while (context->reps_infos->nelts <= idx)
+ APR_ARRAY_PUSH(context->reps_infos, rep_info_t *) = NULL;
+
+ assert(!APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *));
+ APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *) = info;
+}
+
+static svn_error_t *
+copy_rep_to_temp(pack_context_t *context,
+ apr_file_t *rev_file,
+ svn_fs_fs__p2l_entry_t *entry,
+ apr_pool_t *pool)
+{
+ rep_info_t *rep_info = apr_pcalloc(context->info_pool, sizeof(*rep_info));
+ svn_fs_fs__rep_header_t *rep_header;
+ svn_stream_t *stream;
+
+ rep_info->entry = apr_palloc(context->info_pool, sizeof(*rep_info->entry));
+ *rep_info->entry = *entry;
+ rep_info->entry->offset = 0;
+ SVN_ERR(svn_io_file_seek(context->reps_file, SEEK_CUR,
+ &rep_info->entry->offset, pool));
+ add_item_rep_mapping(context, rep_info);
+
+ stream = svn_stream_from_aprfile2(rev_file, TRUE, pool);
+ SVN_ERR(svn_fs_fs__read_rep_header(&rep_header, stream, pool));
+ svn_stream_close(stream);
+
+ if ( rep_header->is_delta
+ && !rep_header->is_delta_vs_empty
+ && rep_header->base_revision >= context->start_rev)
+ {
+ int idx = get_item_array_index(context, rep_header->base_revision,
+ rep_header->base_item_index);
+ if (idx < context->reps_infos->nelts)
+ rep_info->base = APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *);
+ }
+
+ SVN_ERR(svn_io_file_seek(rev_file, SEEK_SET, &entry->offset, pool));
+ SVN_ERR(copy_file_data(context, context->reps_file, rev_file, entry->size,
+ pool));
+
+ return SVN_NO_ERROR;
+}
+
+static int
+compare_dir_entries(const svn_sort__item_t *a,
+ const svn_sort__item_t *b)
+{
+ const svn_fs_dirent_t *lhs = (const svn_fs_dirent_t *) a->value;
+ const svn_fs_dirent_t *rhs = (const svn_fs_dirent_t *) b->value;
+ svn_revnum_t lhs_revision;
+ svn_revnum_t rhs_revision;
+ apr_int64_t lhs_item_index;
+ apr_int64_t rhs_item_index;
+
+ if (lhs->kind != rhs->kind)
+ return lhs->kind == svn_node_dir ? -1 : 1;
+
+ lhs_revision = svn_fs_fs__id_rev(lhs->id);
+ rhs_revision = svn_fs_fs__id_rev(rhs->id);
+ if (lhs_revision != rhs_revision)
+ return lhs_revision > rhs_revision ? -1 : 1;
+
+ lhs_item_index = svn_fs_fs__id_item(lhs->id);
+ rhs_item_index = svn_fs_fs__id_item(rhs->id);
+ if (lhs_item_index != rhs_item_index)
+ return lhs_item_index > rhs_item_index ? -1 : 1;
+
+ return 0;
+}
+
+static svn_error_t *
+copy_node_to_temp(pack_context_t *context,
+ apr_file_t *rev_file,
+ svn_fs_fs__p2l_entry_t *entry,
+ apr_pool_t *pool)
+{
+ rep_info_t *rep_info = apr_pcalloc(context->info_pool, sizeof(*rep_info));
+ node_revision_t *noderev;
+ svn_stream_t *stream;
+
+ rep_info->entry = apr_palloc(context->info_pool, sizeof(*rep_info->entry));
+ *rep_info->entry = *entry;
+ rep_info->entry->offset = 0;
+ SVN_ERR(svn_io_file_seek(context->reps_file, SEEK_CUR,
+ &rep_info->entry->offset, pool));
+ add_item_rep_mapping(context, rep_info);
+
+ stream = svn_stream_from_aprfile2(rev_file, TRUE, pool);
+ SVN_ERR(svn_fs_fs__read_noderev(&noderev, stream, pool));
+ svn_stream_close(stream);
+
+ if (noderev->data_rep && noderev->data_rep->revision >= context->start_rev)
+ {
+ int idx = get_item_array_index(context, noderev->data_rep->revision,
+ noderev->data_rep->item_index);
+ if (idx < context->reps_infos->nelts)
+ rep_info->base = APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *);
+ }
+
+ SVN_ERR(svn_io_file_seek(rev_file, SEEK_SET, &entry->offset, pool));
+ SVN_ERR(copy_file_data(context, context->reps_file, rev_file, entry->size,
+ pool));
+
+ if (noderev->kind == svn_node_dir && rep_info->base)
+ {
+ apr_hash_t *directory;
+ apr_pool_t *scratch_pool = svn_pool_create(pool);
+ apr_array_header_t *sorted;
+ int i;
+
+ rep_info = rep_info->base;
+ SVN_ERR(svn_fs_fs__rep_contents_dir(&directory, context->fs, noderev,
+ scratch_pool));
+ sorted = svn_sort__hash(directory, compare_dir_entries, scratch_pool);
+ for (i = 0; i < sorted->nelts; ++i)
+ {
+ svn_fs_dirent_t *dir_entry
+ = APR_ARRAY_IDX(sorted, i, svn_sort__item_t).value;
+ svn_revnum_t revision = svn_fs_fs__id_rev(dir_entry->id);
+ apr_int64_t item_index = svn_fs_fs__id_item(dir_entry->id);
+
+ if (revision >= context->start_rev)
+ {
+ int idx = get_item_array_index(context, revision, item_index);
+ if (idx < context->reps_infos->nelts)
+ {
+ rep_info->next = APR_ARRAY_IDX(context->reps_infos, idx,
+ rep_info_t *);
+ rep_info = rep_info->next;
+ }
+ }
+ }
+
+ svn_pool_destroy(scratch_pool);
+ }
+
+ return SVN_NO_ERROR;
+}
+
+static int
+compare_p2l_info(const svn_fs_fs__p2l_entry_t * const * lhs,
+ const svn_fs_fs__p2l_entry_t * const * rhs)
+{
+ assert(*lhs != *rhs);
+
+ if ((*lhs)->revision == (*rhs)->revision)
+ return (*lhs)->item_index > (*rhs)->item_index ? -1 : 1;
+
+ return (*lhs)->revision > (*rhs)->revision ? -1 : 1;
+}
+
+static void
+sort_items(apr_array_header_t *entries)
+{
+ qsort(entries->elts, entries->nelts, entries->elt_size,
+ (int (*)(const void *, const void *))compare_p2l_info);
+}
+
+static int
+compare_p2l_info_rev(const svn_fs_fs__p2l_entry_t * const * lhs,
+ const svn_fs_fs__p2l_entry_t * const * rhs)
+{
+ assert(*lhs != *rhs);
+
+ if ((*lhs)->revision == (*rhs)->revision)
+ return 0;
+
+ return (*lhs)->revision < (*rhs)->revision ? -1 : 1;
+}
+
+static void
+sort_by_rev(apr_array_header_t *entries)
+{
+ qsort(entries->elts, entries->nelts, entries->elt_size,
+ (int (*)(const void *, const void *))compare_p2l_info_rev);
+}
+
+static void
+pick_recursively(pack_context_t *context,
+ rep_info_t *info)
+{
+ rep_info_t *temp;
+ do
+ {
+ if (info->entry)
+ {
+ APR_ARRAY_PUSH(context->reps, svn_fs_fs__p2l_entry_t *)
+ = info->entry;
+ info->entry = NULL;
+ }
+
+ if (info->base)
+ {
+ pick_recursively(context, info->base);
+ info->base = NULL;
+ }
+
+ temp = info->next;
+ info->next = NULL;
+ info = temp;
+ }
+ while (info);
+}
+
+static void
+sort_reps(pack_context_t *context)
+{
+ int i;
+ for (i = context->reps_infos->nelts - 1; i >= 0; --i)
+ {
+ rep_info_t *info = APR_ARRAY_IDX(context->reps_infos, i, rep_info_t *);
+ if ( info
+ && info->entry
+ && info->entry->item_index == SVN_FS_FS__ITEM_INDEX_ROOT_NODE)
+ do
+ {
+ APR_ARRAY_PUSH(context->reps, svn_fs_fs__p2l_entry_t *)
+ = info->entry;
+ info->entry = NULL;
+ info = info->base;
+ }
+ while (info && info->entry);
+ }
+
+ for (i = context->reps_infos->nelts - 1; i >= 0; --i)
+ {
+ rep_info_t *info = APR_ARRAY_IDX(context->reps_infos, i, rep_info_t *);
+ if (info && info->entry == NULL && info->next)
+ pick_recursively(context, info);
+ }
+
+ for (i = context->reps_infos->nelts - 1; i >= 0; --i)
+ {
+ rep_info_t *info = APR_ARRAY_IDX(context->reps_infos, i, rep_info_t *);
+ if (info && info->entry)
+ pick_recursively(context, info);
+ }
+}
+
+static svn_error_t *
+copy_items_from_temp(pack_context_t *context,
+ apr_array_header_t *entries,
+ apr_file_t *temp_file,
+ apr_pool_t *pool)
+{
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ int i;
+ for (i = 0; i < entries->nelts; ++i)
+ {
+ svn_fs_fs__p2l_entry_t *entry
+ = APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t *);
+
+ SVN_ERR(svn_io_file_seek(temp_file, SEEK_SET, &entry->offset,
+ iterpool));
+ SVN_ERR(copy_file_data(context, context->pack_file, temp_file,
+ entry->size, pool));
+
+ entry->offset = context->pack_offset;
+ context->pack_offset += entry->size;
+
+ SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry
+ (context->proto_p2l_index, entry, iterpool));
+ }
+
+ svn_pool_destroy(iterpool);
+
+ return SVN_NO_ERROR;
+}
+
+static void
+append_entries(apr_array_header_t *dest,
+ apr_array_header_t *to_append)
+{
+ int i;
+ for (i = 0; i < to_append->nelts; ++i)
+ APR_ARRAY_PUSH(dest, svn_fs_fs__p2l_entry_t *)
+ = APR_ARRAY_IDX(to_append, i, svn_fs_fs__p2l_entry_t *);
+}
+
+static svn_error_t *
+write_l2p_index(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ svn_revnum_t prev_rev = SVN_INVALID_REVNUM;
+ int i;
+
+ append_entries(context->reps, context->changes);
+ append_entries(context->reps, context->file_props);
+ append_entries(context->reps, context->dir_props);
+ sort_by_rev(context->reps);
+
+ for (i = 0; i < context->reps->nelts; ++i)
+ {
+ svn_fs_fs__p2l_entry_t *entry
+ = APR_ARRAY_IDX(context->reps, i, svn_fs_fs__p2l_entry_t *);
+
+ if (prev_rev != entry->revision)
+ {
+ prev_rev = entry->revision;
+ SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision
+ (context->proto_l2p_index, iterpool));
+ }
+
+ SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry
+ (context->proto_l2p_index,
+ entry->offset, entry->item_index, iterpool));
+
+ if (i % 256 == 0)
+ svn_pool_clear(iterpool);
+ }
+
+ svn_pool_destroy(iterpool);
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+pack_section(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ apr_pool_t *revpool = svn_pool_create(pool);
+ apr_pool_t *iterpool = svn_pool_create(pool);
+
+ svn_revnum_t revision;
+ for (revision = context->start_rev; revision < context->end_rev; ++revision)
+ {
+ apr_off_t offset = 0;
+ apr_finfo_t finfo;
+ apr_file_t *rev_file;
+
+ /* Get the size of the file. */
+ const char *path = svn_dirent_join(context->shard_dir,
+ apr_psprintf(revpool, "%ld",
+ revision),
+ revpool);
+ SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, revpool));
+
+ SVN_ERR(svn_io_file_open(&rev_file, path,
+ APR_READ | APR_BUFFERED | APR_BINARY,
+ APR_OS_DEFAULT, revpool));
+
+ APR_ARRAY_PUSH(context->rev_offsets, int) = context->reps_infos->nelts;
+
+ /* read the phys-to-log index file until we covered the whole rev file.
+ * That index contains enough info to build both target indexes from it.
*/
+ while (offset < finfo.size)
+ {
+ /* read one cluster */
+ int i;
+ apr_array_header_t *entries;
+ SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, context->fs,
+ revision, offset,
+ iterpool));
+
+ for (i = 0; i < entries->nelts; ++i)
+ {
+ svn_fs_fs__p2l_entry_t *entry
+ = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+
+ /* skip first entry if that was duplicated due crossing a
+ cluster boundary */
+ if (offset > entry->offset)
+ continue;
+
+ /* process entry while inside the rev file */
+ offset = entry->offset;
+ if (offset < finfo.size)
+ {
+ SVN_ERR(svn_io_file_seek(rev_file, SEEK_SET, &offset,
+ iterpool));
+
+ if (entry->type == SVN_FS_FS__ITEM_TYPE_CHANGES)
+ SVN_ERR(copy_item_to_temp(context,
+ context->changes,
+ context->changes_file,
+ rev_file, entry, iterpool));
+ else if (entry->type == SVN_FS_FS__ITEM_TYPE_FILE_PROPS)
+ SVN_ERR(copy_item_to_temp(context,
+ context->file_props,
+ context->file_props_file,
+ rev_file, entry, iterpool));
+ else if (entry->type == SVN_FS_FS__ITEM_TYPE_DIR_PROPS)
+ SVN_ERR(copy_item_to_temp(context,
+ context->dir_props,
+ context->dir_props_file,
+ rev_file, entry, iterpool));
+ else if ( entry->type == SVN_FS_FS__ITEM_TYPE_FILE_REP
+ || entry->type == SVN_FS_FS__ITEM_TYPE_DIR_REP)
+ SVN_ERR(copy_rep_to_temp(context, rev_file, entry,
+ iterpool));
+ else if (entry->type == SVN_FS_FS__ITEM_TYPE_NODEREV)
+ SVN_ERR(copy_node_to_temp(context, rev_file, entry,
+ iterpool));
+ else
+ SVN_ERR_ASSERT(entry->type == SVN_FS_FS__ITEM_TYPE_UNUSED);
+
+ offset += entry->size;
+ }
+ }
+
+ if (context->cancel_func)
+ SVN_ERR(context->cancel_func(context->cancel_baton));
+
+ svn_pool_clear(iterpool);
+ }
+
+ svn_pool_clear(revpool);
+ }
+
+ svn_pool_destroy(iterpool);
+
+ sort_items(context->changes);
+ sort_items(context->file_props);
+ sort_items(context->dir_props);
+ sort_reps(context);
+
+ SVN_ERR(copy_items_from_temp(context, context->changes,
+ context->changes_file, revpool));
+ svn_pool_clear(revpool);
+ SVN_ERR(copy_items_from_temp(context, context->file_props,
+ context->file_props_file, revpool));
+ svn_pool_clear(revpool);
+ SVN_ERR(copy_items_from_temp(context, context->dir_props,
+ context->dir_props_file, revpool));
+ svn_pool_clear(revpool);
+ SVN_ERR(copy_items_from_temp(context, context->reps,
+ context->reps_file, revpool));
+ svn_pool_clear(revpool);
+ SVN_ERR(write_l2p_index(context, revpool));
+
+ svn_pool_destroy(revpool);
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+append_revision(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ apr_off_t offset = 0;
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ apr_file_t *rev_file;
+ apr_finfo_t finfo;
+
+ /* Get the size of the file. */
+ const char *path = svn_dirent_join(context->shard_dir,
+ apr_psprintf(iterpool, "%ld",
+ context->start_rev),
+ pool);
+ SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, pool));
+
+ /* Copy all the bits from the rev file to the end of the pack file. */
+ SVN_ERR(svn_io_file_open(&rev_file, path,
+ APR_READ | APR_BUFFERED | APR_BINARY,
+ APR_OS_DEFAULT, pool));
+ SVN_ERR(copy_file_data(context, context->pack_file, rev_file, finfo.size,
+ iterpool));
+
+ /* mark the start of a new revision */
+ SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision(context->proto_l2p_index,
+ pool));
+
+ /* read the phys-to-log index file until we covered the whole rev file.
+ * That index contains enough info to build both target indexes from it. */
+ while (offset < finfo.size)
+ {
+ /* read one cluster */
+ int i;
+ apr_array_header_t *entries;
+ SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, context->fs,
+ context->start_rev, offset,
+ iterpool));
+
+ for (i = 0; i < entries->nelts; ++i)
+ {
+ svn_fs_fs__p2l_entry_t *entry
+ = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+
+ /* skip first entry if that was duplicated due crossing a
+ cluster boundary */
+ if (offset > entry->offset)
+ continue;
+
+ /* process entry while inside the rev file */
+ offset = entry->offset;
+ if (offset < finfo.size)
+ {
+ entry->offset += context->pack_offset;
+ offset += entry->size;
+ SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry
+ (context->proto_l2p_index,
+ entry->offset, entry->item_index, iterpool));
+ SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry
+ (context->proto_p2l_index, entry, iterpool));
+ }
+ }
+
+ svn_pool_clear(iterpool);
+ }
+
+ svn_pool_destroy(iterpool);
+ context->pack_offset += finfo.size;
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+pack_log_addressed(svn_fs_t *fs,
+ const char *pack_file_dir,
+ const char *shard_dir,
+ svn_revnum_t shard_rev,
+ apr_size_t max_mem,
+ svn_cancel_func_t cancel_func,
+ void *cancel_baton,
+ apr_pool_t *pool)
+{
+ enum
+ {
+ /* estimated amount of memory used to represent one item in memory
+ * during rev file packing */
+ PER_ITEM_MEM = APR_ALIGN_DEFAULT(sizeof(rep_info_t))
+ + APR_ALIGN_DEFAULT(sizeof(svn_fs_fs__p2l_entry_t))
+ + 6 * sizeof(void*)
+ };
+
+ apr_size_t max_items = max_mem / PER_ITEM_MEM;
+ apr_array_header_t *max_ids;
+ pack_context_t context = { 0 };
+ int i;
+ apr_size_t item_count = 0;
+
+ SVN_ERR(initialize_pack_context(&context, fs, pack_file_dir, shard_dir,
+ shard_rev, max_items, cancel_func,
+ cancel_baton, pool));
+
+ SVN_ERR(svn_fs_fs__l2p_get_max_ids(&max_ids, fs, shard_rev,
+ context.shard_end_rev - shard_rev,
+ pool));
+
+ for (i = 0; i < max_ids->nelts; ++i)
+ if (APR_ARRAY_IDX(max_ids, i, apr_uint64_t) + item_count <= max_items)
+ {
+ context.end_rev++;
+ }
+ else
+ {
+ if (context.start_rev < context.end_rev)
+ {
+ SVN_ERR(pack_section(&context, pool));
+ SVN_ERR(reset_pack_context(&context, pool));
+ item_count = 0;
+ }
+
+ context.start_rev = i + context.shard_rev;
+ context.end_rev = context.start_rev + 1;
+
+ if (APR_ARRAY_IDX(max_ids, i, apr_uint64_t) > max_items)
+ {
+ SVN_ERR(append_revision(&context, pool));
+ context.start_rev++;
+ }
+ else
+ item_count += (apr_size_t)APR_ARRAY_IDX(max_ids, i, apr_uint64_t);
+ }
+
+ if (context.start_rev < context.end_rev)
+ SVN_ERR(pack_section(&context, pool));
+
+ SVN_ERR(reset_pack_context(&context, pool));
+ SVN_ERR(close_pack_context(&context, pool));
+
+ return SVN_NO_ERROR;
+}
+
/* Given REV in FS, set *REV_OFFSET to REV's offset in the packed file.
Use POOL for temporary allocations. */
svn_error_t *
@@ -98,66 +945,70 @@ svn_fs_fs__get_packed_offset(apr_off_t *
return svn_cache__set(ffd->packed_offset_cache, &shard, manifest, pool);
}
-/* Copy the index information from the unpacked revision REV in FS to the
- * PROTO_L2P_INDEX and PROTO_P2L_INDEX proto index files, respectively.
- * Assume that the rev file will be appended to the pack file at offset
- * PACK_OFFSET and that the unpacked rev file contains FILE_SIZE bytes.
- * Use POOL for allocations.
- */
static svn_error_t *
-copy_indexes(svn_fs_t *fs,
- apr_file_t *proto_l2p_index,
- apr_file_t *proto_p2l_index,
- svn_revnum_t rev,
- apr_off_t pack_offset,
- apr_off_t file_size,
- apr_pool_t *pool)
+pack_phys_addressed(svn_fs_t *fs,
+ const char *pack_file_dir,
+ const char *shard_path,
+ svn_revnum_t start_rev,
+ int max_files_per_dir,
+ svn_cancel_func_t cancel_func,
+ void *cancel_baton,
+ apr_pool_t *pool)
{
- apr_off_t offset = 0;
- apr_pool_t *iterpool = svn_pool_create(pool);
+ const char *pack_file_path, *manifest_file_path;
+ svn_stream_t *pack_stream, *manifest_stream;
+ svn_revnum_t end_rev, rev;
+ apr_off_t next_offset;
+ apr_pool_t *iterpool;
- /* mark the start of a new revision */
- SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision(proto_l2p_index, pool));
+ /* Some useful paths. */
+ pack_file_path = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
+ manifest_file_path = svn_dirent_join(pack_file_dir, PATH_MANIFEST, pool);
- /* read the phys-to-log index file until we covered the whole rev file.
- * That index contains enough info to build both target indexes from it. */
- while (offset < file_size)
- {
- /* read one cluster */
- int i;
- apr_array_header_t *entries;
- SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, fs, rev, offset,
- iterpool));
+ /* Create the new directory and pack file. */
+ SVN_ERR(svn_stream_open_writable(&pack_stream, pack_file_path, pool,
+ pool));
- for (i = 0; i < entries->nelts; ++i)
- {
- svn_fs_fs__p2l_entry_t *entry
- = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+ /* Create the manifest file. */
+ SVN_ERR(svn_stream_open_writable(&manifest_stream, manifest_file_path,
+ pool, pool));
- /* skip first entry if that was duplicated due crossing a
- cluster boundary */
- if (offset > entry->offset)
- continue;
+ end_rev = start_rev + max_files_per_dir - 1;
+ next_offset = 0;
+ iterpool = svn_pool_create(pool);
- /* process entry while inside the rev file */
- offset = entry->offset;
- if (offset < file_size)
- {
- entry->offset += pack_offset;
- offset += entry->size;
- SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry(proto_l2p_index,
- entry->offset,
- entry->item_index,
- iterpool));
- SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry(proto_p2l_index,
- entry,
- iterpool));
- }
- }
+ /* Iterate over the revisions in this shard, squashing them together. */
+ for (rev = start_rev; rev <= end_rev; rev++)
+ {
+ svn_stream_t *rev_stream;
+ apr_finfo_t finfo;
+ const char *path;
svn_pool_clear(iterpool);
+
+ /* Get the size of the file. */
+ path = svn_dirent_join(shard_path, apr_psprintf(iterpool, "%ld", rev),
+ iterpool);
+ SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, iterpool));
+
+ /* build manifest */
+ SVN_ERR(svn_stream_printf(manifest_stream, iterpool,
+ "%" APR_OFF_T_FMT "\n", next_offset));
+ next_offset += finfo.size;
+
+ /* Copy all the bits from the rev file to the end of the pack file. */
+ SVN_ERR(svn_stream_open_readonly(&rev_stream, path, iterpool, iterpool));
+ SVN_ERR(svn_stream_copy3(rev_stream, svn_stream_disown(pack_stream,
+ iterpool),
+ cancel_func, cancel_baton, iterpool));
}
+ /* disallow write access to the manifest file */
+ SVN_ERR(svn_stream_close(manifest_stream));
+ SVN_ERR(svn_io_set_file_read_only(manifest_file_path, FALSE, iterpool));
+
+ SVN_ERR(svn_stream_close(pack_stream));
+
svn_pool_destroy(iterpool);
return SVN_NO_ERROR;
@@ -176,29 +1027,17 @@ pack_rev_shard(svn_fs_t *fs,
const char *shard_path,
apr_int64_t shard,
int max_files_per_dir,
+ apr_size_t max_mem,
svn_cancel_func_t cancel_func,
void *cancel_baton,
apr_pool_t *pool)
{
fs_fs_data_t *ffd = fs->fsap_data;
- const char *pack_file_path, *manifest_file_path;
- const char *proto_l2p_index_path, *proto_p2l_index_path;
- const char *l2p_index_path, *p2l_index_path;
- svn_stream_t *pack_stream, *manifest_stream;
- svn_revnum_t start_rev, end_rev, rev;
- apr_off_t next_offset;
- apr_pool_t *iterpool;
- apr_file_t *proto_l2p_index, *proto_p2l_index;
+ const char *pack_file_path;
+ svn_revnum_t shard_rev = (svn_revnum_t) (shard * max_files_per_dir);
/* Some useful paths. */
pack_file_path = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
- manifest_file_path = svn_dirent_join(pack_file_dir, PATH_MANIFEST, pool);
- l2p_index_path = apr_pstrcat(pool, pack_file_path, PATH_EXT_L2P_INDEX, NULL);
- p2l_index_path = apr_pstrcat(pool, pack_file_path, PATH_EXT_P2L_INDEX, NULL);
- proto_l2p_index_path = svn_dirent_join(pack_file_dir,
- PATH_INDEX PATH_EXT_L2P_INDEX, pool);
- proto_p2l_index_path = svn_dirent_join(pack_file_dir,
- PATH_INDEX PATH_EXT_P2L_INDEX, pool);
/* Remove any existing pack file for this shard, since it is incomplete. */
SVN_ERR(svn_io_remove_dir2(pack_file_dir, TRUE, cancel_func, cancel_baton,
@@ -206,91 +1045,18 @@ pack_rev_shard(svn_fs_t *fs,
/* Create the new directory and pack file. */
SVN_ERR(svn_io_dir_make(pack_file_dir, APR_OS_DEFAULT, pool));
- SVN_ERR(svn_stream_open_writable(&pack_stream, pack_file_path, pool,
- pool));
/* Index information files */
if (ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT)
- {
- /* Create proto index files*/
- SVN_ERR(svn_fs_fs__l2p_proto_index_open(&proto_l2p_index,
- proto_l2p_index_path, pool));
- SVN_ERR(svn_fs_fs__p2l_proto_index_open(&proto_p2l_index,
- proto_p2l_index_path, pool));
- }
+ SVN_ERR(pack_log_addressed(fs, pack_file_dir, shard_path, shard_rev,
+ max_mem, cancel_func, cancel_baton, pool));
else
- {
- /* Create the manifest file. */
- SVN_ERR(svn_stream_open_writable(&manifest_stream, manifest_file_path,
- pool, pool));
- }
+ SVN_ERR(pack_phys_addressed(fs, pack_file_dir, shard_path, shard_rev,
+ max_files_per_dir, cancel_func,
+ cancel_baton, pool));
- start_rev = (svn_revnum_t) (shard * max_files_per_dir);
- end_rev = (svn_revnum_t) ((shard + 1) * (max_files_per_dir) - 1);
- next_offset = 0;
- iterpool = svn_pool_create(pool);
-
- /* Iterate over the revisions in this shard, squashing them together. */
- for (rev = start_rev; rev <= end_rev; rev++)
- {
- svn_stream_t *rev_stream;
- apr_finfo_t finfo;
- const char *path;
-
- svn_pool_clear(iterpool);
-
- /* Get the size of the file. */
- path = svn_dirent_join(shard_path, apr_psprintf(iterpool, "%ld", rev),
- iterpool);
- SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, iterpool));
-
- /* build indexes / manifest */
- if (ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT)
- SVN_ERR(copy_indexes(fs, proto_l2p_index, proto_p2l_index,
- rev, next_offset, finfo.size, pool));
- else
- SVN_ERR(svn_stream_printf(manifest_stream, iterpool,
- "%" APR_OFF_T_FMT "\n", next_offset));
- next_offset += finfo.size;
-
- /* Copy all the bits from the rev file to the end of the pack file. */
- SVN_ERR(svn_stream_open_readonly(&rev_stream, path, iterpool, iterpool));
- SVN_ERR(svn_stream_copy3(rev_stream, svn_stream_disown(pack_stream,
- iterpool),
- cancel_func, cancel_baton, iterpool));
- }
-
- /* Finalize Index information files */
- if (ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT)
- {
- /* finalize proto index files */
- SVN_ERR(svn_io_file_close(proto_l2p_index, iterpool));
- SVN_ERR(svn_io_file_close(proto_p2l_index, iterpool));
-
- /* Create the actual index files*/
- SVN_ERR(svn_fs_fs__l2p_index_create(fs, l2p_index_path,
- proto_l2p_index_path,
- start_rev, iterpool));
- SVN_ERR(svn_fs_fs__p2l_index_create(fs, p2l_index_path,
- proto_p2l_index_path,
- start_rev, iterpool));
-
- /* remove proto index files */
- SVN_ERR(svn_io_remove_file2(proto_l2p_index_path, FALSE, iterpool));
- SVN_ERR(svn_io_remove_file2(proto_p2l_index_path, FALSE, iterpool));
- }
- else
- {
- /* disallow write access to the manifest file */
- SVN_ERR(svn_stream_close(manifest_stream));
- SVN_ERR(svn_io_set_file_read_only(manifest_file_path, FALSE, iterpool));
- }
-
- SVN_ERR(svn_stream_close(pack_stream));
- SVN_ERR(svn_io_copy_perms(shard_path, pack_file_dir, iterpool));
- SVN_ERR(svn_io_set_file_read_only(pack_file_path, FALSE, iterpool));
-
- svn_pool_destroy(iterpool);
+ SVN_ERR(svn_io_copy_perms(shard_path, pack_file_dir, pool));
+ SVN_ERR(svn_io_set_file_read_only(pack_file_path, FALSE, pool));
return SVN_NO_ERROR;
}
@@ -341,7 +1107,7 @@ pack_shard(const char *revs_dir,
/* pack the revision content */
SVN_ERR(pack_rev_shard(fs, rev_pack_file_dir, rev_shard_path,
- shard, max_files_per_dir,
+ shard, max_files_per_dir, 64 * 1024 * 1024,
cancel_func, cancel_baton, pool));
/* if enabled, pack the revprops in an equivalent way */