Author: stefan2
Date: Fri Aug  9 10:43:49 2013
New Revision: 1512244

URL: http://svn.apache.org/r1512244
Log:
On the log-addressing branch:  Add minimal packing support for logically
addressed shards to PASS more tests.

This is not transient code but will always be needed as a fallback for
extremly large revisions.  It also introduces the "pack context"
infrastructure.  Later revisions will add reordering logic.

* subversion/libsvn_fs_fs/pack.c
  (pack_context_t
   initialize_pack_context,
   reset_pack_context,
   close_pack_context): introduce packing context context infrastructure
  (copy_file_data): utility to efficiently (space and time) copy a data
                    block from one file to another
  (append_revision,
   pack_log_addressed): non-reordering packing logic log. addressing
  (pack_phys_addressed): tweak docstring
  (pack_rev_shard): call the new logic for logically addressed shards

Modified:
    subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c

Modified: subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c
URL: 
http://svn.apache.org/viewvc/subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c?rev=1512244&r1=1512243&r2=1512244&view=diff
==============================================================================
--- subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c (original)
+++ subversion/branches/log-addressing/subversion/libsvn_fs_fs/pack.c Fri Aug  
9 10:43:49 2013
@@ -32,6 +32,7 @@
 #include "pack.h"
 #include "util.h"
 #include "id.h"
+#include "index.h"
 #include "low_level.h"
 #include "revprops.h"
 #include "transaction.h"
@@ -41,6 +42,230 @@
 #include "svn_private_config.h"
 #include "temp_serializer.h"
 
+/* This structure keeps track of all the temporary data and status that
+ * needs to be kept around during the creation of one pack file.  After
+ * each revision range (in case we can't process all revs at once due to
+ * memory restrictions), parts of the data will get re-initialized.
+ */
+typedef struct pack_context_t
+{
+  /* file system that we operate on */
+  svn_fs_t *fs;
+
+  /* cancel function to invoke at regular intervals. May be NULL */
+  svn_cancel_func_t cancel_func;
+
+  /* baton to pass to CANCEL_FUNC */
+  void *cancel_baton;
+
+  /* first revision in the shard (and future pack file) */
+  svn_revnum_t shard_rev;
+
+  /* first revision in the range to process (>= SHARD_REV) */
+  svn_revnum_t start_rev;
+
+  /* first revision after the range to process (<= SHARD_END_REV) */
+  svn_revnum_t end_rev;
+
+  /* first revision after the current shard */
+  svn_revnum_t shard_end_rev;
+
+  /* log-to-phys proto index for the whole pack file */
+  apr_file_t *proto_l2p_index;
+
+  /* phys-to-log proto index for the whole pack file */
+  apr_file_t *proto_p2l_index;
+
+  /* full shard directory path (containing the unpacked revisions) */
+  const char *shard_dir;
+
+  /* full packed shard directory path (containing the pack file + indexes) */
+  const char *pack_file_dir;
+
+  /* full pack file path (including PACK_FILE_DIR) */
+  const char *pack_file_path;
+
+  /* current write position (i.e. file length) in the pack file */
+  apr_off_t pack_offset;
+
+  /* the pack file to ultimately write all data to */
+  apr_file_t *pack_file;
+
+  /* pool used for temporary data structures that will be cleaned up when
+   * the next range of revisions is being processed */
+  apr_pool_t *info_pool;
+} pack_context_t;
+
+/* Create and initialize a new pack context for packing shard SHARD_REV in
+ * SHARD_DIR into PACK_FILE_DIR within filesystem FS.  Allocate it in POOL
+ * and return the structure in *CONTEXT.
+ *
+ * Limit the number of items being copied per iteration to MAX_ITEMS.
+ * Set CANCEL_FUNC and CANCEL_BATON as well.
+ */
+static svn_error_t *
+initialize_pack_context(pack_context_t *context,
+                        svn_fs_t *fs,
+                        const char *pack_file_dir,
+                        const char *shard_dir,
+                        svn_revnum_t shard_rev,
+                        svn_cancel_func_t cancel_func,
+                        void *cancel_baton,
+                        apr_pool_t *pool)
+{
+  fs_fs_data_t *ffd = fs->fsap_data;
+  const char *temp_dir;
+  
+  SVN_ERR_ASSERT(ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT);
+  SVN_ERR_ASSERT(shard_rev % ffd->max_files_per_dir == 0);
+  
+  /* where we will place our various temp files */
+  SVN_ERR(svn_io_temp_dir(&temp_dir, pool));
+
+  /* store parameters */
+  context->fs = fs;
+  context->cancel_func = cancel_func;
+  context->cancel_baton = cancel_baton;
+
+  context->shard_rev = shard_rev;
+  context->start_rev = shard_rev;
+  context->end_rev = shard_rev;
+  context->shard_end_rev = shard_rev + ffd->max_files_per_dir;
+  
+  /* Create the new directory and pack file. */
+  context->shard_dir = shard_dir;
+  context->pack_file_dir = pack_file_dir;
+  context->pack_file_path
+    = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
+  SVN_ERR(svn_io_file_open(&context->pack_file, context->pack_file_path,
+                           APR_WRITE | APR_BUFFERED | APR_BINARY | APR_EXCL
+                             | APR_CREATE, APR_OS_DEFAULT, pool));
+
+  /* Proto index files */
+  SVN_ERR(svn_fs_fs__l2p_proto_index_open
+            (&context->proto_l2p_index,
+             svn_dirent_join(pack_file_dir,
+                             PATH_INDEX PATH_EXT_L2P_INDEX,
+                             pool),
+             pool));
+  SVN_ERR(svn_fs_fs__p2l_proto_index_open
+            (&context->proto_p2l_index,
+             svn_dirent_join(pack_file_dir,
+                             PATH_INDEX PATH_EXT_P2L_INDEX,
+                             pool),
+             pool));
+
+  /* the pool used for temp structures */
+  context->info_pool = svn_pool_create(pool);
+
+  return SVN_NO_ERROR;
+};
+
+/* Clean up / free all revision range specific data and files in CONTEXT.
+ * Use POOL for temporary allocations.
+ */
+static svn_error_t *
+reset_pack_context(pack_context_t *context,
+                   apr_pool_t *pool)
+{
+  svn_pool_clear(context->info_pool);
+  
+  return SVN_NO_ERROR;
+};
+
+/* Call this after the last revision range.  It will finalize all index files
+ * for CONTEXT and close any open files.  Use POOL for temporary allocations.
+ */
+static svn_error_t *
+close_pack_context(pack_context_t *context,
+                   apr_pool_t *pool)
+{
+  const char *l2p_index_path
+    = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_L2P_INDEX, NULL);
+  const char *p2l_index_path
+    = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_P2L_INDEX, NULL);
+  const char *proto_l2p_index_path;
+  const char *proto_p2l_index_path;
+
+  /* need the file names for the actual index creation call further down */
+  SVN_ERR(svn_io_file_name_get(&proto_l2p_index_path,
+                               context->proto_l2p_index, pool));
+  SVN_ERR(svn_io_file_name_get(&proto_p2l_index_path,
+                               context->proto_p2l_index, pool));
+  
+  /* finalize proto index files */
+  SVN_ERR(svn_io_file_close(context->proto_l2p_index, pool));
+  SVN_ERR(svn_io_file_close(context->proto_p2l_index, pool));
+
+  /* Create the actual index files*/
+  SVN_ERR(svn_fs_fs__l2p_index_create(context->fs, l2p_index_path,
+                                      proto_l2p_index_path,
+                                      context->shard_rev, pool));
+  SVN_ERR(svn_fs_fs__p2l_index_create(context->fs, p2l_index_path,
+                                      proto_p2l_index_path,
+                                      context->shard_rev, pool));
+
+  /* remove proto index files */
+  SVN_ERR(svn_io_remove_file2(proto_l2p_index_path, FALSE, pool));
+  SVN_ERR(svn_io_remove_file2(proto_p2l_index_path, FALSE, pool));
+
+  SVN_ERR(svn_io_file_close(context->pack_file, pool));
+
+  return SVN_NO_ERROR;
+};
+
+/* Efficiently copy SIZE bytes from SOURCE to DEST.  Invoke the CANCEL_FUNC
+ * from CONTEXT at regular intervals.  Use POOL for allocations.
+ */
+static svn_error_t *
+copy_file_data(pack_context_t *context,
+               apr_file_t *dest,
+               apr_file_t *source,
+               apr_off_t size,
+               apr_pool_t *pool)
+{
+  /* most non-representation items will be small.  Minimize the buffer
+   * and infrastructure overhead in that case. */
+  enum { STACK_BUFFER_SIZE = 1024 };
+ 
+  if (size < STACK_BUFFER_SIZE)
+    {
+      /* copy small data using a fixed-size buffer on stack */
+      char buffer[STACK_BUFFER_SIZE];
+      SVN_ERR(svn_io_file_read_full2(source, buffer, (apr_size_t)size,
+                                     NULL, NULL, pool));
+      SVN_ERR(svn_io_file_write_full(dest, buffer, (apr_size_t)size,
+                                     NULL, pool));
+    }
+  else
+    {
+      /* use streaming copies for larger data blocks.  That may require
+       * the allocation of larger buffers and we should make sure that
+       * this extra memory is released asap. */
+      fs_fs_data_t *ffd = context->fs->fsap_data;
+      apr_pool_t *copypool = svn_pool_create(pool);
+      char *buffer = apr_palloc(copypool, ffd->block_size);
+
+      while (size)
+        {
+          apr_size_t to_copy = (apr_size_t)(MIN(size, ffd->block_size));
+          if (context->cancel_func)
+            SVN_ERR(context->cancel_func(context->cancel_baton));
+
+          SVN_ERR(svn_io_file_read_full2(source, buffer, to_copy,
+                                         NULL, NULL, pool));
+          SVN_ERR(svn_io_file_write_full(dest, buffer, to_copy,
+                                         NULL, pool));
+
+          size -= to_copy;
+        }
+
+      svn_pool_destroy(copypool);
+    }
+
+  return SVN_NO_ERROR;
+}
+
 /* Directories entries sorted by revision (decreasing - to max cache hits)
  * and offset (increasing - to max benefit from APR file buffering).
  */
@@ -86,6 +311,125 @@ svn_fs_fs__order_dir_entries(svn_fs_t *f
   return result;
 }
 
+/* Append CONTEXT->START_REV to the context's pack file with no re-ordering.
+ * This function will only be used for very large revisions (>>100k changes).
+ * Use POOL for temporary allocations.
+ */
+static svn_error_t *
+append_revision(pack_context_t *context,
+                apr_pool_t *pool)
+{
+  apr_off_t offset = 0;
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  apr_file_t *rev_file;
+  apr_finfo_t finfo;
+
+  /* Get the size of the file. */
+  const char *path = svn_dirent_join(context->shard_dir,
+                                     apr_psprintf(iterpool, "%ld",
+                                                  context->start_rev),
+                                     pool);
+  SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, pool));
+
+  /* Copy all the bits from the rev file to the end of the pack file. */
+  SVN_ERR(svn_io_file_open(&rev_file, path,
+                           APR_READ | APR_BUFFERED | APR_BINARY,
+                           APR_OS_DEFAULT, pool));
+  SVN_ERR(copy_file_data(context, context->pack_file, rev_file, finfo.size, 
+                         iterpool));
+
+  /* mark the start of a new revision */
+  SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision(context->proto_l2p_index,
+                                                  pool));
+
+  /* read the phys-to-log index file until we covered the whole rev file.
+   * That index contains enough info to build both target indexes from it. */
+  while (offset < finfo.size)
+    {
+      /* read one cluster */
+      int i;
+      apr_array_header_t *entries;
+      SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, context->fs,
+                                          context->start_rev, offset,
+                                          iterpool));
+
+      for (i = 0; i < entries->nelts; ++i)
+        {
+          svn_fs_fs__p2l_entry_t *entry
+            = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+
+          /* skip first entry if that was duplicated due crossing a
+             cluster boundary */
+          if (offset > entry->offset)
+            continue;
+
+          /* process entry while inside the rev file */
+          offset = entry->offset;
+          if (offset < finfo.size)
+            {
+              entry->offset += context->pack_offset;
+              offset += entry->size;
+              SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry
+                        (context->proto_l2p_index, entry->offset,
+                         entry->item.number, iterpool));
+              SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry
+                        (context->proto_p2l_index, entry, iterpool));
+            }
+        }
+
+      svn_pool_clear(iterpool);
+    }
+
+  svn_pool_destroy(iterpool);
+  context->pack_offset += finfo.size;
+
+  return SVN_NO_ERROR;
+}
+
+/* Logical addressing mode packing logic.
+ *
+ * Pack the revision shard starting at SHARD_REV in filesystem FS from
+ * SHARD_DIR into the PACK_FILE_DIR, using POOL for allocations.  Limit
+ * the extra memory consumption to MAX_MEM bytes.  CANCEL_FUNC and
+ * CANCEL_BATON are what you think they are.
+ */
+static svn_error_t *
+pack_log_addressed(svn_fs_t *fs,
+                   const char *pack_file_dir,
+                   const char *shard_dir,
+                   svn_revnum_t shard_rev,
+                   svn_cancel_func_t cancel_func,
+                   void *cancel_baton,
+                   apr_pool_t *pool)
+{
+  pack_context_t context = { 0 };
+  svn_revnum_t rev;
+  apr_pool_t *iterpool = svn_pool_create(pool);
+
+  /* set up a pack context */
+  SVN_ERR(initialize_pack_context(&context, fs, pack_file_dir, shard_dir,
+                                  shard_rev, cancel_func,
+                                  cancel_baton, pool));
+
+  /* pack revisions in ranges that don't exceed MAX_MEM */
+  for (rev = context.shard_rev; rev < context.shard_end_rev; ++rev)
+    {
+      context.start_rev = rev;
+      context.end_rev = rev + 1;
+
+      SVN_ERR(append_revision(&context, iterpool));
+
+      svn_pool_clear(iterpool);
+    }
+
+  /* last phase: finalize indexes and clean up */
+  SVN_ERR(reset_pack_context(&context, iterpool));
+  SVN_ERR(close_pack_context(&context, iterpool));
+  svn_pool_destroy(iterpool);
+
+  return SVN_NO_ERROR;
+}
+
 /* Given REV in FS, set *REV_OFFSET to REV's offset in the packed file.
    Use POOL for temporary allocations. */
 svn_error_t *
@@ -151,7 +495,8 @@ svn_fs_fs__get_packed_offset(apr_off_t *
   return svn_cache__set(ffd->packed_offset_cache, &shard, manifest, pool);
 }
 
-/* Packing logic:  Simply concatenate all revision contents.
+/* Packing logic for physical addresssing mode:
+ * Simply concatenate all revision contents.
  * 
  * Pack the revision shard starting at SHARD_REV containing exactly
  * MAX_FILES_PER_DIR revisions from SHARD_PATH into the PACK_FILE_DIR,
@@ -262,9 +607,13 @@ pack_rev_shard(svn_fs_t *fs,
   SVN_ERR(svn_io_dir_make(pack_file_dir, APR_OS_DEFAULT, pool));
 
   /* Index information files */
-  SVN_ERR(pack_phys_addressed(pack_file_dir, shard_path, shard_rev,
-                              max_files_per_dir, cancel_func,
-                              cancel_baton, pool));
+  if (svn_fs_fs__use_log_addressing(fs, shard_rev))
+    SVN_ERR(pack_log_addressed(fs, pack_file_dir, shard_path, shard_rev,
+                               cancel_func, cancel_baton, pool));
+  else
+    SVN_ERR(pack_phys_addressed(pack_file_dir, shard_path, shard_rev,
+                                max_files_per_dir, cancel_func,
+                                cancel_baton, pool));
   
   SVN_ERR(svn_io_copy_perms(shard_path, pack_file_dir, pool));
   SVN_ERR(svn_io_set_file_read_only(pack_file_path, FALSE, pool));


Reply via email to