Author: stefan2
Date: Fri Aug 9 10:43:49 2013
New Revision: 1512244
URL: http://svn.apache.org/r1512244
Log:
On the log-addressing branch: Add minimal packing support for logically
addressed shards to PASS more tests.
This is not transient code but will always be needed as a fallback for
extremly large revisions. It also introduces the "pack context"
infrastructure. Later revisions will add reordering logic.
* subversion/libsvn_fs_fs/pack.c
(pack_context_t
initialize_pack_context,
reset_pack_context,
close_pack_context): introduce packing context context infrastructure
(copy_file_data): utility to efficiently (space and time) copy a data
block from one file to another
(append_revision,
pack_log_addressed): non-reordering packing logic log. addressing
(pack_phys_addressed): tweak docstring
(pack_rev_shard): call the new logic for logically addressed shards
Modified:
subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c
Modified: subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c
URL:
http://svn.apache.org/viewvc/subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c?rev=1512244&r1=1512243&r2=1512244&view=diff
==============================================================================
--- subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c (original)
+++ subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c Fri Aug
9 10:43:49 2013
@@ -32,6 +32,7 @@
#include "pack.h"
#include "util.h"
#include "id.h"
+#include "index.h"
#include "low_level.h"
#include "revprops.h"
#include "transaction.h"
@@ -41,6 +42,230 @@
#include "svn_private_config.h"
#include "temp_serializer.h"
+/* This structure keeps track of all the temporary data and status that
+ * needs to be kept around during the creation of one pack file. After
+ * each revision range (in case we can't process all revs at once due to
+ * memory restrictions), parts of the data will get re-initialized.
+ */
+typedef struct pack_context_t
+{
+ /* file system that we operate on */
+ svn_fs_t *fs;
+
+ /* cancel function to invoke at regular intervals. May be NULL */
+ svn_cancel_func_t cancel_func;
+
+ /* baton to pass to CANCEL_FUNC */
+ void *cancel_baton;
+
+ /* first revision in the shard (and future pack file) */
+ svn_revnum_t shard_rev;
+
+ /* first revision in the range to process (>= SHARD_REV) */
+ svn_revnum_t start_rev;
+
+ /* first revision after the range to process (<= SHARD_END_REV) */
+ svn_revnum_t end_rev;
+
+ /* first revision after the current shard */
+ svn_revnum_t shard_end_rev;
+
+ /* log-to-phys proto index for the whole pack file */
+ apr_file_t *proto_l2p_index;
+
+ /* phys-to-log proto index for the whole pack file */
+ apr_file_t *proto_p2l_index;
+
+ /* full shard directory path (containing the unpacked revisions) */
+ const char *shard_dir;
+
+ /* full packed shard directory path (containing the pack file + indexes) */
+ const char *pack_file_dir;
+
+ /* full pack file path (including PACK_FILE_DIR) */
+ const char *pack_file_path;
+
+ /* current write position (i.e. file length) in the pack file */
+ apr_off_t pack_offset;
+
+ /* the pack file to ultimately write all data to */
+ apr_file_t *pack_file;
+
+ /* pool used for temporary data structures that will be cleaned up when
+ * the next range of revisions is being processed */
+ apr_pool_t *info_pool;
+} pack_context_t;
+
+/* Create and initialize a new pack context for packing shard SHARD_REV in
+ * SHARD_DIR into PACK_FILE_DIR within filesystem FS. Allocate it in POOL
+ * and return the structure in *CONTEXT.
+ *
+ * Limit the number of items being copied per iteration to MAX_ITEMS.
+ * Set CANCEL_FUNC and CANCEL_BATON as well.
+ */
+static svn_error_t *
+initialize_pack_context(pack_context_t *context,
+ svn_fs_t *fs,
+ const char *pack_file_dir,
+ const char *shard_dir,
+ svn_revnum_t shard_rev,
+ svn_cancel_func_t cancel_func,
+ void *cancel_baton,
+ apr_pool_t *pool)
+{
+ fs_fs_data_t *ffd = fs->fsap_data;
+ const char *temp_dir;
+
+ SVN_ERR_ASSERT(ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT);
+ SVN_ERR_ASSERT(shard_rev % ffd->max_files_per_dir == 0);
+
+ /* where we will place our various temp files */
+ SVN_ERR(svn_io_temp_dir(&temp_dir, pool));
+
+ /* store parameters */
+ context->fs = fs;
+ context->cancel_func = cancel_func;
+ context->cancel_baton = cancel_baton;
+
+ context->shard_rev = shard_rev;
+ context->start_rev = shard_rev;
+ context->end_rev = shard_rev;
+ context->shard_end_rev = shard_rev + ffd->max_files_per_dir;
+
+ /* Create the new directory and pack file. */
+ context->shard_dir = shard_dir;
+ context->pack_file_dir = pack_file_dir;
+ context->pack_file_path
+ = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
+ SVN_ERR(svn_io_file_open(&context->pack_file, context->pack_file_path,
+ APR_WRITE | APR_BUFFERED | APR_BINARY | APR_EXCL
+ | APR_CREATE, APR_OS_DEFAULT, pool));
+
+ /* Proto index files */
+ SVN_ERR(svn_fs_fs__l2p_proto_index_open
+ (&context->proto_l2p_index,
+ svn_dirent_join(pack_file_dir,
+ PATH_INDEX PATH_EXT_L2P_INDEX,
+ pool),
+ pool));
+ SVN_ERR(svn_fs_fs__p2l_proto_index_open
+ (&context->proto_p2l_index,
+ svn_dirent_join(pack_file_dir,
+ PATH_INDEX PATH_EXT_P2L_INDEX,
+ pool),
+ pool));
+
+ /* the pool used for temp structures */
+ context->info_pool = svn_pool_create(pool);
+
+ return SVN_NO_ERROR;
+};
+
+/* Clean up / free all revision range specific data and files in CONTEXT.
+ * Use POOL for temporary allocations.
+ */
+static svn_error_t *
+reset_pack_context(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ svn_pool_clear(context->info_pool);
+
+ return SVN_NO_ERROR;
+};
+
+/* Call this after the last revision range. It will finalize all index files
+ * for CONTEXT and close any open files. Use POOL for temporary allocations.
+ */
+static svn_error_t *
+close_pack_context(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ const char *l2p_index_path
+ = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_L2P_INDEX, NULL);
+ const char *p2l_index_path
+ = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_P2L_INDEX, NULL);
+ const char *proto_l2p_index_path;
+ const char *proto_p2l_index_path;
+
+ /* need the file names for the actual index creation call further down */
+ SVN_ERR(svn_io_file_name_get(&proto_l2p_index_path,
+ context->proto_l2p_index, pool));
+ SVN_ERR(svn_io_file_name_get(&proto_p2l_index_path,
+ context->proto_p2l_index, pool));
+
+ /* finalize proto index files */
+ SVN_ERR(svn_io_file_close(context->proto_l2p_index, pool));
+ SVN_ERR(svn_io_file_close(context->proto_p2l_index, pool));
+
+ /* Create the actual index files*/
+ SVN_ERR(svn_fs_fs__l2p_index_create(context->fs, l2p_index_path,
+ proto_l2p_index_path,
+ context->shard_rev, pool));
+ SVN_ERR(svn_fs_fs__p2l_index_create(context->fs, p2l_index_path,
+ proto_p2l_index_path,
+ context->shard_rev, pool));
+
+ /* remove proto index files */
+ SVN_ERR(svn_io_remove_file2(proto_l2p_index_path, FALSE, pool));
+ SVN_ERR(svn_io_remove_file2(proto_p2l_index_path, FALSE, pool));
+
+ SVN_ERR(svn_io_file_close(context->pack_file, pool));
+
+ return SVN_NO_ERROR;
+};
+
+/* Efficiently copy SIZE bytes from SOURCE to DEST. Invoke the CANCEL_FUNC
+ * from CONTEXT at regular intervals. Use POOL for allocations.
+ */
+static svn_error_t *
+copy_file_data(pack_context_t *context,
+ apr_file_t *dest,
+ apr_file_t *source,
+ apr_off_t size,
+ apr_pool_t *pool)
+{
+ /* most non-representation items will be small. Minimize the buffer
+ * and infrastructure overhead in that case. */
+ enum { STACK_BUFFER_SIZE = 1024 };
+
+ if (size < STACK_BUFFER_SIZE)
+ {
+ /* copy small data using a fixed-size buffer on stack */
+ char buffer[STACK_BUFFER_SIZE];
+ SVN_ERR(svn_io_file_read_full2(source, buffer, (apr_size_t)size,
+ NULL, NULL, pool));
+ SVN_ERR(svn_io_file_write_full(dest, buffer, (apr_size_t)size,
+ NULL, pool));
+ }
+ else
+ {
+ /* use streaming copies for larger data blocks. That may require
+ * the allocation of larger buffers and we should make sure that
+ * this extra memory is released asap. */
+ fs_fs_data_t *ffd = context->fs->fsap_data;
+ apr_pool_t *copypool = svn_pool_create(pool);
+ char *buffer = apr_palloc(copypool, ffd->block_size);
+
+ while (size)
+ {
+ apr_size_t to_copy = (apr_size_t)(MIN(size, ffd->block_size));
+ if (context->cancel_func)
+ SVN_ERR(context->cancel_func(context->cancel_baton));
+
+ SVN_ERR(svn_io_file_read_full2(source, buffer, to_copy,
+ NULL, NULL, pool));
+ SVN_ERR(svn_io_file_write_full(dest, buffer, to_copy,
+ NULL, pool));
+
+ size -= to_copy;
+ }
+
+ svn_pool_destroy(copypool);
+ }
+
+ return SVN_NO_ERROR;
+}
+
/* Directories entries sorted by revision (decreasing - to max cache hits)
* and offset (increasing - to max benefit from APR file buffering).
*/
@@ -86,6 +311,125 @@ svn_fs_fs__order_dir_entries(svn_fs_t *f
return result;
}
+/* Append CONTEXT->START_REV to the context's pack file with no re-ordering.
+ * This function will only be used for very large revisions (>>100k changes).
+ * Use POOL for temporary allocations.
+ */
+static svn_error_t *
+append_revision(pack_context_t *context,
+ apr_pool_t *pool)
+{
+ apr_off_t offset = 0;
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ apr_file_t *rev_file;
+ apr_finfo_t finfo;
+
+ /* Get the size of the file. */
+ const char *path = svn_dirent_join(context->shard_dir,
+ apr_psprintf(iterpool, "%ld",
+ context->start_rev),
+ pool);
+ SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, pool));
+
+ /* Copy all the bits from the rev file to the end of the pack file. */
+ SVN_ERR(svn_io_file_open(&rev_file, path,
+ APR_READ | APR_BUFFERED | APR_BINARY,
+ APR_OS_DEFAULT, pool));
+ SVN_ERR(copy_file_data(context, context->pack_file, rev_file, finfo.size,
+ iterpool));
+
+ /* mark the start of a new revision */
+ SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision(context->proto_l2p_index,
+ pool));
+
+ /* read the phys-to-log index file until we covered the whole rev file.
+ * That index contains enough info to build both target indexes from it. */
+ while (offset < finfo.size)
+ {
+ /* read one cluster */
+ int i;
+ apr_array_header_t *entries;
+ SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, context->fs,
+ context->start_rev, offset,
+ iterpool));
+
+ for (i = 0; i < entries->nelts; ++i)
+ {
+ svn_fs_fs__p2l_entry_t *entry
+ = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+
+ /* skip first entry if that was duplicated due crossing a
+ cluster boundary */
+ if (offset > entry->offset)
+ continue;
+
+ /* process entry while inside the rev file */
+ offset = entry->offset;
+ if (offset < finfo.size)
+ {
+ entry->offset += context->pack_offset;
+ offset += entry->size;
+ SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry
+ (context->proto_l2p_index, entry->offset,
+ entry->item.number, iterpool));
+ SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry
+ (context->proto_p2l_index, entry, iterpool));
+ }
+ }
+
+ svn_pool_clear(iterpool);
+ }
+
+ svn_pool_destroy(iterpool);
+ context->pack_offset += finfo.size;
+
+ return SVN_NO_ERROR;
+}
+
+/* Logical addressing mode packing logic.
+ *
+ * Pack the revision shard starting at SHARD_REV in filesystem FS from
+ * SHARD_DIR into the PACK_FILE_DIR, using POOL for allocations. Limit
+ * the extra memory consumption to MAX_MEM bytes. CANCEL_FUNC and
+ * CANCEL_BATON are what you think they are.
+ */
+static svn_error_t *
+pack_log_addressed(svn_fs_t *fs,
+ const char *pack_file_dir,
+ const char *shard_dir,
+ svn_revnum_t shard_rev,
+ svn_cancel_func_t cancel_func,
+ void *cancel_baton,
+ apr_pool_t *pool)
+{
+ pack_context_t context = { 0 };
+ svn_revnum_t rev;
+ apr_pool_t *iterpool = svn_pool_create(pool);
+
+ /* set up a pack context */
+ SVN_ERR(initialize_pack_context(&context, fs, pack_file_dir, shard_dir,
+ shard_rev, cancel_func,
+ cancel_baton, pool));
+
+ /* pack revisions in ranges that don't exceed MAX_MEM */
+ for (rev = context.shard_rev; rev < context.shard_end_rev; ++rev)
+ {
+ context.start_rev = rev;
+ context.end_rev = rev + 1;
+
+ SVN_ERR(append_revision(&context, iterpool));
+
+ svn_pool_clear(iterpool);
+ }
+
+ /* last phase: finalize indexes and clean up */
+ SVN_ERR(reset_pack_context(&context, iterpool));
+ SVN_ERR(close_pack_context(&context, iterpool));
+ svn_pool_destroy(iterpool);
+
+ return SVN_NO_ERROR;
+}
+
/* Given REV in FS, set *REV_OFFSET to REV's offset in the packed file.
Use POOL for temporary allocations. */
svn_error_t *
@@ -151,7 +495,8 @@ svn_fs_fs__get_packed_offset(apr_off_t *
return svn_cache__set(ffd->packed_offset_cache, &shard, manifest, pool);
}
-/* Packing logic: Simply concatenate all revision contents.
+/* Packing logic for physical addresssing mode:
+ * Simply concatenate all revision contents.
*
* Pack the revision shard starting at SHARD_REV containing exactly
* MAX_FILES_PER_DIR revisions from SHARD_PATH into the PACK_FILE_DIR,
@@ -262,9 +607,13 @@ pack_rev_shard(svn_fs_t *fs,
SVN_ERR(svn_io_dir_make(pack_file_dir, APR_OS_DEFAULT, pool));
/* Index information files */
- SVN_ERR(pack_phys_addressed(pack_file_dir, shard_path, shard_rev,
- max_files_per_dir, cancel_func,
- cancel_baton, pool));
+ if (svn_fs_fs__use_log_addressing(fs, shard_rev))
+ SVN_ERR(pack_log_addressed(fs, pack_file_dir, shard_path, shard_rev,
+ cancel_func, cancel_baton, pool));
+ else
+ SVN_ERR(pack_phys_addressed(pack_file_dir, shard_path, shard_rev,
+ max_files_per_dir, cancel_func,
+ cancel_baton, pool));
SVN_ERR(svn_io_copy_perms(shard_path, pack_file_dir, pool));
SVN_ERR(svn_io_set_file_read_only(pack_file_path, FALSE, pool));