Philip Martin <philip.mar...@wandisco.com> writes:

> Evgeny Kotkov <evgeny.kot...@visualsvn.com> writes:
>
>> Or is it just a gut feeling that we should be using streams here?
>
> We have been gradually moving our file-based code to stream-based code
> for years.

I believe that flushing files (instead of achieving the same behavior with
streams) is a better choice in this particular context.  The relevant bits of
code in libsvn_fs_fs/revprops.c that are responsible for persisting revision
property changes work with files, and we cannot do something with that.  We
can abstract certain parts of this logic with streams — e.g., when it comes
to functions that serialize changes, because they do not really care if the
destination is a file or a generic stream.

However, an attempt to provide a proper abstraction for fsync() with streams
is a step towards dynamic typing with a possibility of a runtime error like
SVN_ERR_STREAM_NOT_SUPPORTED.  I think that in this particular case using
apr_file_t is better than forcing the usage of svn_stream_t just because we
have been moving from files toward streams.  We have a temporary file on the
disk, and we need to fsync() it when we finished working with it.  What could
be more straightforward than calling svn_io_file_flush_to_disk() on the file
right before svn_io_file_close()?

I'd like to point out that we have an option of achieving the same behavior
with streams, if we're aiming towards this.  While this is probably a slight
overkill, we could add svn_stream_flush() — inspired by design of the .NET
system library [1,2] — to our API and provide the necessary flush handlers.
This could be useful not just for files, but also for svn_stream_compressed()
and maybe for flushing network buffers if we ever need to do so.

I sketched this option in the attached patch.  With this patch applied, we
could rework the r1680819 fix like below.  What do you think?
[[[
Index: subversion/libsvn_fs_fs/revprops.c
===================================================================
--- subversion/libsvn_fs_fs/revprops.c  (revision 1680705)
+++ subversion/libsvn_fs_fs/revprops.c  (working copy)
@@ -661,6 +661,7 @@ write_non_packed_revprop(const char **final_path,
                                  svn_dirent_dirname(*final_path, pool),
                                  svn_io_file_del_none, pool, pool));
   SVN_ERR(svn_hash_write2(proplist, stream, SVN_HASH_TERMINATOR, pool));
+  SVN_ERR(svn_stream_flush(stream, TRUE));
   SVN_ERR(svn_stream_close(stream));

   return SVN_NO_ERROR;
@@ -811,9 +812,8 @@ repack_revprops(svn_fs_t *fs,
                           ? SVN_DELTA_COMPRESSION_LEVEL_DEFAULT
                           : SVN_DELTA_COMPRESSION_LEVEL_NONE));

-  /* finally, write the content to the target stream and close it */
+  /* finally, write the content to the target stream */
   SVN_ERR(svn_stream_write(file_stream, compressed->data, &compressed->len));
-  SVN_ERR(svn_stream_close(file_stream));

   return SVN_NO_ERROR;
 }
@@ -933,6 +933,8 @@ write_packed_revprop(const char **final_path,
       SVN_ERR(repack_revprops(fs, revprops, 0, revprops->sizes->nelts,
                               changed_index, serialized, new_total_size,
                               stream, pool));
+      SVN_ERR(svn_stream_flush(stream, TRUE));
+      SVN_ERR(svn_stream_close(stream));
     }
   else
     {
@@ -983,6 +985,8 @@ write_packed_revprop(const char **final_path,
           SVN_ERR(repack_revprops(fs, revprops, 0, left_count,
                                   changed_index, serialized, new_total_size,
                                   stream, pool));
+          SVN_ERR(svn_stream_flush(stream, TRUE));
+          SVN_ERR(svn_stream_close(stream));
         }

       if (left_count + right_count < revprops->sizes->nelts)
@@ -994,6 +998,8 @@ write_packed_revprop(const char **final_path,
                                   changed_index + 1,
                                   changed_index, serialized, new_total_size,
                                   stream, pool));
+          SVN_ERR(svn_stream_flush(stream, TRUE));
+          SVN_ERR(svn_stream_close(stream));
         }

       if (right_count)
@@ -1007,6 +1013,8 @@ write_packed_revprop(const char **final_path,
                                   revprops->sizes->nelts, changed_index,
                                   serialized, new_total_size, stream,
                                   pool));
+          SVN_ERR(svn_stream_flush(stream, TRUE));
+          SVN_ERR(svn_stream_close(stream));
         }

       /* write the new manifest */
@@ -1021,6 +1029,7 @@ write_packed_revprop(const char **final_path,
           SVN_ERR(svn_stream_printf(stream, pool, "%s\n", filename));
         }

+      SVN_ERR(svn_stream_flush(stream, TRUE));
       SVN_ERR(svn_stream_close(stream));
     }
]]]

[1] https://msdn.microsoft.com/en-us/library/system.io.stream.flush
[2] https://msdn.microsoft.com/en-us/library/ee474552


Regards,
Evgeny Kotkov
Index: subversion/include/svn_io.h
===================================================================
--- subversion/include/svn_io.h (revision 1681582)
+++ subversion/include/svn_io.h (working copy)
@@ -918,6 +918,14 @@ typedef svn_error_t *(*svn_stream_seek_fn_t)(void
 typedef svn_error_t *(*svn_stream_data_available_fn_t)(void *baton,
                                               svn_boolean_t *data_available);
 
+/** Flush handler function for a generic stream. @see svn_stream_t and
+ * svn_stream_flush().
+ *
+ * @since New in 1.9.
+ */
+typedef svn_error_t *(*svn_stream_flush_fn_t)(void *baton,
+                                              svn_boolean_t sync);
+
 /** Create a generic stream.  @see svn_stream_t. */
 svn_stream_t *
 svn_stream_create(void *baton,
@@ -992,6 +1000,14 @@ void
 svn_stream_set_data_available(svn_stream_t *stream,
                               svn_stream_data_available_fn_t data_available);
 
+/** Set @a stream's flush function to @a flush_fn
+ *
+ * @since New in 1.9.
+ */
+void
+svn_stream_set_flush(svn_stream_t *stream,
+                     svn_stream_flush_fn_t flush_fn);
+
 /** Create a stream that is empty for reading and infinite for writing. */
 svn_stream_t *
 svn_stream_empty(apr_pool_t *pool);
@@ -1358,6 +1374,19 @@ svn_error_t *
 svn_stream_data_available(svn_stream_t *stream,
                           svn_boolean_t *data_available);
 
+/** Flushes all available internal buffers in a generic @a stream.  If
+ * @sync is non-zero, it causes any buffered data to be written to the
+ * underlying device (for example, to the disk for a file stream).
+ *
+ * This function returns the #SVN_ERR_STREAM_NOT_SUPPORTED error if the
+ * stream doesn't implement flushing.
+ *
+ * @since New in 1.9.
+ */
+svn_error_t *
+svn_stream_flush(svn_stream_t *stream,
+                 svn_boolean_t sync);
+
 /** Return a writable stream which, when written to, writes to both of the
  * underlying streams.  Both of these streams will be closed upon closure of
  * the returned stream; use svn_stream_disown() if this is not the desired
@@ -1548,8 +1577,8 @@ typedef svn_error_t *
  *
  * @a open_func and @a open_baton are a callback function/baton pair
  * which will be invoked upon the first access of the returned
- * stream (read, write, mark, seek, skip, or possibly close).  The
- * callback shall open the primary stream.
+ * stream (read, write, mark, seek, skip, flush or possibly close).
+ * The callback shall open the primary stream.
  *
  * If the only "access" the returned stream gets is to close it
  * then @a open_func will only be called if @a open_on_close is TRUE.
Index: subversion/libsvn_subr/stream.c
===================================================================
--- subversion/libsvn_subr/stream.c     (revision 1681582)
+++ subversion/libsvn_subr/stream.c     (working copy)
@@ -60,6 +60,7 @@ struct svn_stream_t {
   svn_stream_mark_fn_t mark_fn;
   svn_stream_seek_fn_t seek_fn;
   svn_stream_data_available_fn_t data_available_fn;
+  svn_stream_flush_fn_t flush_fn;
   svn_stream__is_buffered_fn_t is_buffered_fn;
   apr_file_t *file; /* Maybe NULL */
 };
@@ -138,6 +139,13 @@ svn_stream_set_data_available(svn_stream_t *stream
 }
 
 void
+svn_stream_set_flush(svn_stream_t *stream,
+                     svn_stream_flush_fn_t flush_fn)
+{
+  stream->flush_fn = flush_fn;
+}
+
+void
 svn_stream__set_is_buffered(svn_stream_t *stream,
                             svn_stream__is_buffered_fn_t is_buffered_fn)
 {
@@ -257,6 +265,15 @@ svn_stream_data_available(svn_stream_t *stream,
                                                    data_available));
 }
 
+svn_error_t *
+svn_stream_flush(svn_stream_t *stream, svn_boolean_t sync)
+{
+  if (stream->flush_fn == NULL)
+    return svn_error_create(SVN_ERR_STREAM_NOT_SUPPORTED, NULL, NULL);
+
+  return svn_error_trace(stream->flush_fn(stream->baton, sync));
+}
+
 svn_boolean_t
 svn_stream__is_buffered(svn_stream_t *stream)
 {
@@ -656,6 +673,12 @@ seek_handler_empty(void *baton, const svn_stream_m
   return SVN_NO_ERROR;
 }
 
+static svn_error_t *
+flush_handler_empty(void *baton, svn_boolean_t sync)
+{
+  return SVN_NO_ERROR;
+}
+
 static svn_boolean_t
 is_buffered_handler_empty(void *baton)
 {
@@ -673,6 +696,7 @@ svn_stream_empty(apr_pool_t *pool)
   svn_stream_set_write(stream, write_handler_empty);
   svn_stream_set_mark(stream, mark_handler_empty);
   svn_stream_set_seek(stream, seek_handler_empty);
+  svn_stream_set_flush(stream, flush_handler_empty);
   svn_stream__set_is_buffered(stream, is_buffered_handler_empty);
   return stream;
 }
@@ -697,7 +721,17 @@ write_handler_tee(void *baton, const char *data, a
   return SVN_NO_ERROR;
 }
 
+static svn_error_t *
+flush_handler_tee(void *baton, svn_boolean_t sync)
+{
+  struct baton_tee *bt = baton;
 
+  SVN_ERR(svn_stream_flush(bt->out1, sync));
+  SVN_ERR(svn_stream_flush(bt->out2, sync));
+
+  return SVN_NO_ERROR;
+}
+
 static svn_error_t *
 close_handler_tee(void *baton)
 {
@@ -729,6 +763,7 @@ svn_stream_tee(svn_stream_t *out1,
   baton->out2 = out2;
   stream = svn_stream_create(baton, pool);
   svn_stream_set_write(stream, write_handler_tee);
+  svn_stream_set_flush(stream, flush_handler_tee);
   svn_stream_set_close(stream, close_handler_tee);
 
   return stream;
@@ -780,6 +815,12 @@ data_available_disown(void *baton, svn_boolean_t *
   return svn_error_trace(svn_stream_data_available(baton, data_available));
 }
 
+static svn_error_t *
+flush_handler_disown(void *baton, svn_boolean_t sync)
+{
+  return svn_error_trace(svn_stream_flush(baton, sync));
+}
+
 static svn_boolean_t
 is_buffered_handler_disown(void *baton)
 {
@@ -797,6 +838,7 @@ svn_stream_disown(svn_stream_t *stream, apr_pool_t
   svn_stream_set_mark(s, mark_handler_disown);
   svn_stream_set_seek(s, seek_handler_disown);
   svn_stream_set_data_available(s, data_available_disown);
+  svn_stream_set_flush(s, flush_handler_disown);
   svn_stream__set_is_buffered(s, is_buffered_handler_disown);
 
   return s;
@@ -971,6 +1013,19 @@ data_available_handler_apr(void *baton, svn_boolea
     }
 }
 
+static svn_error_t *
+flush_handler_apr(void *baton, svn_boolean_t sync)
+{
+  struct baton_apr *btn = baton;
+
+  if (sync)
+    SVN_ERR(svn_io_file_flush_to_disk(btn->file, btn->pool));
+  else
+    SVN_ERR(svn_io_file_flush(btn->file, btn->pool));
+
+  return SVN_NO_ERROR;
+}
+
 static svn_boolean_t
 is_buffered_handler_apr(void *baton)
 {
@@ -1053,6 +1108,7 @@ svn_stream_from_aprfile2(apr_file_t *file,
   svn_stream_set_mark(stream, mark_handler_apr);
   svn_stream_set_seek(stream, seek_handler_apr);
   svn_stream_set_data_available(stream, data_available_handler_apr);
+  svn_stream_set_flush(stream, flush_handler_apr);
   svn_stream__set_is_buffered(stream, is_buffered_handler_apr);
   stream->file = file;
 
@@ -1370,6 +1426,14 @@ data_available_handler_checksum(void *baton, svn_b
 }
 
 static svn_error_t *
+flush_handler_checksum(void *baton, svn_boolean_t sync)
+{
+  struct checksum_stream_baton *btn = baton;
+
+  return svn_error_trace(svn_stream_flush(btn->proxy, sync));
+}
+
+static svn_error_t *
 close_handler_checksum(void *baton)
 {
   struct checksum_stream_baton *btn = baton;
@@ -1433,6 +1497,7 @@ svn_stream_checksummed2(svn_stream_t *stream,
   svn_stream_set_read2(s, read_handler_checksum, read_full_handler_checksum);
   svn_stream_set_write(s, write_handler_checksum);
   svn_stream_set_data_available(s, data_available_handler_checksum);
+  svn_stream_set_flush(s, flush_handler_checksum);
   svn_stream_set_close(s, close_handler_checksum);
   return s;
 }
@@ -1553,6 +1618,12 @@ data_available_handler_stringbuf(void *baton, svn_
   return SVN_NO_ERROR;
 }
 
+static svn_error_t *
+flush_handler_stringbuf(void *baton, svn_boolean_t sync)
+{
+  return SVN_NO_ERROR;
+}
+
 static svn_boolean_t
 is_buffered_handler_stringbuf(void *baton)
 {
@@ -1579,6 +1650,7 @@ svn_stream_from_stringbuf(svn_stringbuf_t *str,
   svn_stream_set_mark(stream, mark_handler_stringbuf);
   svn_stream_set_seek(stream, seek_handler_stringbuf);
   svn_stream_set_data_available(stream, data_available_handler_stringbuf);
+  svn_stream_set_flush(stream, flush_handler_stringbuf);
   svn_stream__set_is_buffered(stream, is_buffered_handler_stringbuf);
   return stream;
 }
@@ -1658,6 +1730,12 @@ data_available_handler_string(void *baton, svn_boo
   return SVN_NO_ERROR;
 }
 
+static svn_error_t *
+flush_handler_string(void *baton, svn_boolean_t sync)
+{
+  return SVN_NO_ERROR;
+}
+
 static svn_boolean_t
 is_buffered_handler_string(void *baton)
 {
@@ -1683,6 +1761,7 @@ svn_stream_from_string(const svn_string_t *str,
   svn_stream_set_seek(stream, seek_handler_string);
   svn_stream_set_skip(stream, skip_handler_string);
   svn_stream_set_data_available(stream, data_available_handler_string);
+  svn_stream_set_flush(stream, flush_handler_string);
   svn_stream__set_is_buffered(stream, is_buffered_handler_string);
   return stream;
 }
@@ -1931,6 +2010,17 @@ data_available_handler_lazyopen(void *baton,
                                                    data_available));
 }
 
+static svn_error_t *
+flush_handler_lazyopen(void *baton, svn_boolean_t sync)
+{
+  lazyopen_baton_t *b = baton;
+
+  SVN_ERR(lazyopen_if_unopened(b));
+  SVN_ERR(svn_stream_flush(b->real_stream, sync));
+
+  return SVN_NO_ERROR;
+}
+
 /* Implements svn_stream__is_buffered_fn_t */
 static svn_boolean_t
 is_buffered_lazyopen(void *baton)
@@ -1968,6 +2058,7 @@ svn_stream_lazyopen_create(svn_stream_lazyopen_fun
   svn_stream_set_mark(stream, mark_handler_lazyopen);
   svn_stream_set_seek(stream, seek_handler_lazyopen);
   svn_stream_set_data_available(stream, data_available_handler_lazyopen);
+  svn_stream_set_flush(stream, flush_handler_lazyopen);
   svn_stream__set_is_buffered(stream, is_buffered_lazyopen);
 
   return stream;

Reply via email to