Hi,

Attached is a patch that enables 'streaming' support for xz output, in short LZMA_SYNC_FLUSH is called every X milliseconds. We find it helpful so that we can effectively do:
----
tail -f foobar.log.xz | nc w.x.y.z 1234
----

Meanwhile foobar.log.xz is effectively being generated with:
----
tail -f foobar.log | xz -c --select-timeout 500 > foobar.log.xz
----

This means the receiver then gets something that is decodeable in X milliseconds rather than having to wait for a whole block to be generated and flushed, which might be a considerable time if whatever is writing to foobar.log is low volume (100 bytes per second for example).

The patch is for 5.0.0 (what is currently in Debian 'oldstable/squeeze') but if the community likes the look of the patch, I can roll a version for whatever is at the HEAD of the git tree.

Feedback welcomed.

Cheers

--
Alexander Clouter
.sigmonster says: Friction is a drag.
diff -u -r xz-utils-5.0.0.orig/src/xz/args.c xz-utils-5.0.0/src/xz/args.c
--- xz-utils-5.0.0.orig/src/xz/args.c	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/args.c	2013-05-01 21:14:15.567999924 +0100
@@ -73,6 +73,7 @@
 		OPT_FILES0,
 		OPT_MEM_COMPRESS,
 		OPT_MEM_DECOMPRESS,
+		OPT_SELECT_TIMEOUT,
 		OPT_NO_ADJUST,
 		OPT_INFO_MEMORY,
 		OPT_ROBOT,
@@ -107,6 +108,7 @@
 		{ "memlimit-decompress", required_argument, NULL, OPT_MEM_DECOMPRESS },
 		{ "memlimit",     required_argument, NULL,  'M' },
 		{ "memory",       required_argument, NULL,  'M' }, // Old alias
+		{ "select-timeout",      required_argument, NULL,  OPT_SELECT_TIMEOUT },
 		{ "no-adjust",    no_argument,       NULL,  OPT_NO_ADJUST },
 		{ "threads",      required_argument, NULL,  'T' },
 
@@ -169,6 +171,12 @@
 					true, true);
 			break;
 
+		// --select-timeout
+		case OPT_SELECT_TIMEOUT:
+			opt_select_timeout = str_to_uint64(
+					"select-timeout", optarg, 0, UINT32_MAX);
+			break;
+
 		// --suffix
 		case 'S':
 			suffix_set(optarg);
diff -u -r xz-utils-5.0.0.orig/src/xz/coder.c xz-utils-5.0.0/src/xz/coder.c
--- xz-utils-5.0.0.orig/src/xz/coder.c	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/coder.c	2013-05-03 19:55:58.272000010 +0100
@@ -24,6 +24,7 @@
 enum operation_mode opt_mode = MODE_COMPRESS;
 enum format_type opt_format = FORMAT_AUTO;
 bool opt_auto_adjust = true;
+int opt_select_timeout = 0;
 
 
 /// Stream used to communicate with liblzma
@@ -255,6 +256,17 @@
 					memory_limit), 2));
 	}
 
+	if (opt_select_timeout !=0) {
+		if (opt_format == FORMAT_LZMA)
+			message_fatal(_("LZMA_SYNC_FLUSH is not available for LZMA1"));
+
+		if (opt_mode == MODE_COMPRESS)
+			message(V_DEBUG, _("LZMA_SYNC_FLUSH set for every %s milliseconds"),
+				uint64_to_str(opt_select_timeout, 0));
+		else
+			message_fatal(_("LZMA_SYNC_FLUSH is only available for compression"));
+	};
+
 /*
 	// Limit the number of worker threads so that memory usage
 	// limit isn't exceeded.
@@ -472,9 +484,11 @@
 	strm.avail_out = IO_BUFFER_SIZE;
 
 	while (!user_abort) {
+		if (pair->select_timeout && !pair->src_eof)
+			action = LZMA_SYNC_FLUSH;
 		// Fill the input buffer if it is empty and we haven't reached
 		// end of file yet.
-		if (strm.avail_in == 0 && !pair->src_eof) {
+		else if (strm.avail_in == 0 && !pair->src_eof) {
 			strm.next_in = in_buf.u8;
 			strm.avail_in = io_read(
 					pair, &in_buf, IO_BUFFER_SIZE);
@@ -484,13 +498,15 @@
 
 			if (pair->src_eof)
 				action = LZMA_FINISH;
+			else if	(strm.avail_in == 0)
+				continue;
 		}
 
 		// Let liblzma do the actual work.
 		ret = lzma_code(&strm, action);
 
 		// Write out if the output buffer became full.
-		if (strm.avail_out == 0) {
+		if (strm.avail_out == 0 || action == LZMA_SYNC_FLUSH) {
 			if (opt_mode != MODE_TEST && io_write(pair, &out_buf,
 					IO_BUFFER_SIZE - strm.avail_out))
 				break;
@@ -518,6 +534,12 @@
 			}
 
 			if (ret == LZMA_STREAM_END) {
+				if (action == LZMA_SYNC_FLUSH) {
+					pair->select_timeout = false;
+					action = LZMA_RUN;
+					continue;
+				}
+
 				// Check that there is no trailing garbage.
 				// This is needed for LZMA_Alone and raw
 				// streams.
diff -u -r xz-utils-5.0.0.orig/src/xz/coder.h xz-utils-5.0.0/src/xz/coder.h
--- xz-utils-5.0.0.orig/src/xz/coder.h	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/coder.h	2013-05-01 16:02:04.676000099 +0100
@@ -41,6 +41,8 @@
 /// they exceed the memory usage limit.
 extern bool opt_auto_adjust;
 
+/// Autoflush after X milliseconds, whether to use select() on input.
+extern int opt_select_timeout;
 
 /// Set the integrity check type used when compressing
 extern void coder_set_check(lzma_check check);
diff -u -r xz-utils-5.0.0.orig/src/xz/file_io.c xz-utils-5.0.0/src/xz/file_io.c
--- xz-utils-5.0.0.orig/src/xz/file_io.c	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/file_io.c	2013-05-03 19:55:56.672000010 +0100
@@ -13,6 +13,8 @@
 #include "private.h"
 
 #include <fcntl.h>
+#include <sys/time.h>
+#include <sys/select.h>
 
 #ifdef TUKLIB_DOSLIKE
 #	include <io.h>
@@ -290,6 +292,15 @@
 {
 	// There's nothing to open when reading from stdin.
 	if (pair->src_name == stdin_filename) {
+		if (opt_select_timeout != 0) {
+			int retval;
+			int flags = fcntl(STDIN_FILENO, F_GETFL, 0);
+			retval = fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK);
+			if (retval)
+				message_fatal(_("failed to set O_NONBLOCK on STDIN: %s"),
+						strerror(errno));
+		}
+
 		pair->src_fd = STDIN_FILENO;
 #ifdef TUKLIB_DOSLIKE
 		setmode(STDIN_FILENO, O_BINARY);
@@ -526,6 +537,7 @@
 		.src_eof = false,
 		.dest_try_sparse = false,
 		.dest_pending_sparse = 0,
+		.select_timeout = false,
 	};
 
 	// Block the signals, for which we have a custom signal handler, so
@@ -796,14 +808,69 @@
 extern size_t
 io_read(file_pair *pair, io_buf *buf_union, size_t size)
 {
+	static struct timeval stv;
+	struct timeval ctv;
+
 	// We use small buffers here.
 	assert(size < SSIZE_MAX);
 
 	uint8_t *buf = buf_union->u8;
 	size_t left = size;
 
+	if (opt_select_timeout != 0 && !timerisset(&stv)) {
+		if (gettimeofday(&stv, NULL) != 0)
+			message_fatal(_("gettimeofday() failed: %s"),
+				strerror(errno));
+	}
+
 	while (left > 0) {
-		const ssize_t amount = read(pair->src_fd, buf, left);
+		ssize_t amount;
+
+		if (opt_select_timeout != 0) {
+			fd_set fds;
+			int retval;
+			struct timeval tv = {
+				.tv_sec		= opt_select_timeout/1000,
+				.tv_usec	= (opt_select_timeout % 1000)*1000,
+			};
+
+			FD_ZERO(&fds);
+			FD_SET(pair->src_fd, &fds);
+
+			if (gettimeofday(&ctv, NULL) != 0)
+				message_fatal(_("gettimeofday() failed: %s"),
+					strerror(errno));
+
+			timersub(&ctv, &stv, &ctv);
+
+			if (!timercmp(&tv, &ctv, >)) {
+				pair->select_timeout = true;
+				timerclear(&stv);
+				break;
+			}
+
+			timersub(&tv, &ctv, &tv);
+
+			retval = select(pair->src_fd+1,
+						&fds, NULL, &fds, &tv);
+
+			if (retval == -1) {
+				if (errno == EINTR)
+					continue;
+
+				message_error(_("select() error: %s"),
+					strerror(errno));
+				pair->src_eof = true;
+
+				return SIZE_MAX;
+			} else if (retval == 0) {
+				pair->select_timeout = true;
+				timerclear(&stv);
+				break;
+			}
+		}
+
+		amount = read(pair->src_fd, buf, left);
 
 		if (amount == 0) {
 			pair->src_eof = true;
diff -u -r xz-utils-5.0.0.orig/src/xz/file_io.h xz-utils-5.0.0/src/xz/file_io.h
--- xz-utils-5.0.0.orig/src/xz/file_io.h	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/file_io.h	2013-05-02 13:49:01.132000005 +0100
@@ -61,6 +61,8 @@
 	/// Stat of the destination file.
 	struct stat dest_st;
 
+	// select timeout flag on when to flush
+	bool select_timeout;
 } file_pair;
 
 
diff -u -r xz-utils-5.0.0.orig/src/xz/message.c xz-utils-5.0.0/src/xz/message.c
--- xz-utils-5.0.0.orig/src/xz/message.c	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/message.c	2013-05-01 23:40:32.156000006 +0100
@@ -1149,6 +1149,13 @@
 
 	if (long_help) {
 		puts(_(
+"      --select-timeout=TIMEOUT\n"
+"                      call LZMA_SYNC_FLUSH every TIMEOUT milliseconds when\n"
+"                      less than IO_BUFFER_SIZE bytes of input are available"));
+	}
+
+	if (long_help) {
+		puts(_(
 "\n Custom filter chain for compression (alternative for using presets):"));
 
 #if defined(HAVE_ENCODER_LZMA1) || defined(HAVE_DECODER_LZMA1) \
diff -u -r xz-utils-5.0.0.orig/src/xz/xz.1 xz-utils-5.0.0/src/xz/xz.1
--- xz-utils-5.0.0.orig/src/xz/xz.1	2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/xz.1	2013-05-01 23:51:11.704000006 +0100
@@ -873,6 +873,14 @@
 Automatic adjusting is always disabled when creating raw streams
 .RB ( \-\-format=raw ).
 .TP
+.BI \-\-select\-timeout= timeout
+Call LZMA_SYNC_FLUSH to the coder every
+.I timeout
+milliseconds when the bytes read on the input are less than
+IO_BUFFER_SIZE.
+This is helpful when you wish to stream content to the output in
+a latency sensitive environment.
+.TP
 \fB\-T\fR \fIthreads\fR, \fB\-\-threads=\fIthreads
 Specify the number of worker threads to use.
 The actual number of threads can be less than

Reply via email to