Changeset: da0370e3f0b0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=da0370e3f0b0
Modified Files:
common/stream/stream.c
Branch: Jul2012
Log Message:
Fix reading from .bz2 file created by pbzip2.
pbzip2 splits the file into chunks which are compressed
independently. These chunks are then concatenated into the resulting
.bz2 file. When reading this, each chunk is a separate "stream",
after the end of which can come another stream. In other words, we
need to continue reading until we've reached the actual end-of-file,
not the end-of-stream. We need to use the official bzip2 interface
(BZ2_bzRead and friends) instead of the zlib-compatible BZ2_bzread.
diffs (157 lines):
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -799,14 +799,57 @@ stream *open_gzwastream(const char *file
/* streams working on a bzip2-compressed disk file */
#ifdef HAVE_LIBBZ2
+struct bz {
+ BZFILE *b;
+ FILE *f;
+};
+
+static void
+stream_bzclose(stream *s)
+{
+ int err = BZ_OK;
+
+ if (s->stream_data.p) {
+ if (s->access == ST_READ)
+ BZ2_bzReadClose(&err, ((struct bz *)
s->stream_data.p)->b);
+ else
+ BZ2_bzWriteClose(&err, ((struct bz *)
s->stream_data.p)->b, 0, NULL, NULL);
+ fclose(((struct bz *) s->stream_data.p)->f);
+ free(s->stream_data.p);
+ }
+ s->stream_data.p = NULL;
+}
+
static ssize_t
stream_bzread(stream *s, void *buf, size_t elmsize, size_t cnt)
{
int size = (int) (elmsize * cnt);
-
- size = BZ2_bzread((BZFILE *) s->stream_data.p, buf, size);
- if (size)
- return size / elmsize;
+ int err;
+ void *punused;
+ int nunused;
+ char unused[BZ_MAX_UNUSED];
+
+ if (s->stream_data.p) {
+ size = BZ2_bzRead(&err, ((struct bz *) s->stream_data.p)->b,
buf, size);
+ if (err == BZ_STREAM_END) {
+ /* end of stream, but not necessarily end of
+ * file: get unused bits, close stream, and
+ * open again with the saved unused bits */
+ BZ2_bzReadGetUnused(&err, ((struct bz *)
s->stream_data.p)->b, &punused, &nunused);
+ if (err == BZ_OK &&
+ (nunused > 0 ||
+ !feof(((struct bz *) s->stream_data.p)->f))) {
+ if (nunused > 0)
+ memcpy(unused, punused, nunused);
+ BZ2_bzReadClose(&err, ((struct bz *)
s->stream_data.p)->b);
+ ((struct bz *) s->stream_data.p)->b =
BZ2_bzReadOpen(&err, ((struct bz *) s->stream_data.p)->f, 0, 0, unused,
nunused);
+ } else {
+ stream_bzclose(s);
+ }
+ }
+ if (err == BZ_OK)
+ return size / elmsize;
+ }
return 0;
}
@@ -814,27 +857,13 @@ static ssize_t
stream_bzwrite(stream *s, const void *buf, size_t elmsize, size_t cnt)
{
int size = (int) (elmsize * cnt);
+ int err;
if (size) {
- size = BZ2_bzwrite((BZFILE *) s->stream_data.p, (void *) buf,
size);
- return size / elmsize;
+ BZ2_bzWrite(&err, ((struct bz *) s->stream_data.p)->b, (void *)
buf, size);
+ if (err == BZ_OK)
+ return cnt;
}
- return cnt;
-}
-
-static void
-stream_bzclose(stream *s)
-{
- if (s->stream_data.p)
- BZ2_bzclose((BZFILE *) s->stream_data.p);
- s->stream_data.p = NULL;
-}
-
-static int
-stream_bzflush(stream *s)
-{
- if (s->access == ST_WRITE)
- BZ2_bzflush((BZFILE *) s->stream_data.p);
return 0;
}
@@ -842,17 +871,35 @@ static stream *
open_bzstream(const char *filename, const char *flags)
{
stream *s;
- BZFILE *fp;
-
- if ((s = create_stream(filename)) == NULL)
+ int err;
+ struct bz *bzp;
+
+ if ((bzp = malloc(sizeof(struct bz))) == NULL)
return NULL;
- if ((fp = BZ2_bzopen(filename, flags)) == NULL)
+ if ((s = create_stream(filename)) == NULL) {
+ free(bzp);
+ return NULL;
+ }
+ if ((bzp->f = fopen(filename, flags)) == NULL)
+ s->errnr = MNSTR_OPEN_ERROR;
+ if (strchr(flags, 'r') != NULL) {
+ bzp->b = BZ2_bzReadOpen(&err, bzp->f, 0, 0, NULL, 0);
+ s->access = ST_READ;
+ if (err == BZ_STREAM_END) {
+ BZ2_bzReadClose(&err, bzp->b);
+ bzp->b = NULL;
+ }
+ } else {
+ bzp->b = BZ2_bzWriteOpen(&err, bzp->f, 9, 0, 30);
+ s->access = ST_WRITE;
+ }
+ if (err != BZ_OK)
s->errnr = MNSTR_OPEN_ERROR;
s->read = stream_bzread;
s->write = stream_bzwrite;
s->close = stream_bzclose;
- s->flush = stream_bzflush;
- s->stream_data.p = (void *) fp;
+ s->flush = NULL;
+ s->stream_data.p = (void *) bzp;
return s;
}
@@ -865,7 +912,7 @@ open_bzrstream(const char *filename)
return NULL;
s->type = ST_BIN;
if (s->errnr == MNSTR_NO__ERROR &&
- BZ2_bzread((BZFILE *) s->stream_data.p, (void *) &s->byteorder,
sizeof(s->byteorder)) < (int) sizeof(s->byteorder)) {
+ stream_bzread(s, (void *) &s->byteorder, sizeof(s->byteorder), 1)
!= 1) {
stream_bzclose(s);
s->errnr = MNSTR_OPEN_ERROR;
}
@@ -882,7 +929,7 @@ open_bzwstream_(const char *filename, co
s->access = ST_WRITE;
s->type = ST_BIN;
if (s->errnr == MNSTR_NO__ERROR)
- BZ2_bzwrite((BZFILE *) s->stream_data.p, (void *)
&s->byteorder, sizeof(s->byteorder));
+ stream_bzwrite(s, (void *) &s->byteorder, sizeof(s->byteorder),
1);
return s;
}
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list