Changeset: cd474532e378 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cd474532e378
Modified Files:
common/stream/stream.c
common/stream/stream.h
Branch: fixed-width-format
Log Message:
new fwf stream wrapper
diffs (160 lines):
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4671,3 +4671,146 @@ stream * stream_blackhole_create (void)
s->access = ST_WRITE;
return s;
}
+
+
+/* fixed-width format streams */
+
+#define STREAM_FWF_NAME "fwf"
+#define STREAM_FWF_FIELD_SEP '|'
+#define STREAM_FWF_ESCAPE '\\'
+#define STREAM_FWF_RECORD_SEP '\n'
+
+typedef struct {
+ stream *s;
+ // config
+ size_t num_fields;
+ size_t *widths;
+ char filler;
+ // state
+ size_t line_len;
+ char* in_buf;
+ char* out_buf;
+ size_t out_buf_start;
+ size_t out_buf_remaining;
+} stream_fwf_data;
+
+
+static ssize_t
+stream_fwf_read(stream *s, void *buf, size_t elmsize, size_t cnt)
+{
+ stream_fwf_data *fsd;
+ size_t to_write = cnt;
+ size_t buf_written = 0;
+ if (strcmp(s->name, STREAM_FWF_NAME) != 0 || elmsize != 1) {
+ return -1;
+ }
+ fsd = (stream_fwf_data*) s->stream_data.p;
+
+ while (to_write > 0) {
+ // input conversion
+ if (fsd->out_buf_remaining == 0) { // need to convert next line
+ size_t field_idx, in_buf_pos = 0, out_buf_pos = 0;
+ ssize_t actually_read = fsd->s->read(fsd->s,
fsd->in_buf, 1, fsd->line_len);
+ if (actually_read < (ssize_t) fsd->line_len) { //
incomplete last line
+ if (actually_read < 0) {
+ return actually_read; // this is an
error
+ }
+ return buf_written; // skip last line
+ }
+ for (field_idx = 0; field_idx < fsd->num_fields;
field_idx++) {
+ char *val_start, *val_end;
+ val_start = fsd->in_buf + in_buf_pos;
+ in_buf_pos += fsd->widths[field_idx];
+ val_end = fsd->in_buf + in_buf_pos - 1;
+ while (*val_start == fsd->filler) val_start++;
+ while (*val_end == fsd->filler) val_end--;
+ while (val_start <= val_end) {
+ if (*val_start == STREAM_FWF_FIELD_SEP)
{
+ fsd->out_buf[out_buf_pos++] =
STREAM_FWF_ESCAPE;
+ }
+ fsd->out_buf[out_buf_pos++] =
*val_start++;
+ }
+ fsd->out_buf[out_buf_pos++] =
STREAM_FWF_FIELD_SEP;
+ }
+ fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP;
+ fsd->out_buf_remaining = out_buf_pos;
+ fsd->out_buf_start = 0;
+ }
+
+ // now we know something is in output_buf so deliver it
+ if (fsd->out_buf_remaining <= to_write) {
+ memcpy((char*)buf + buf_written, fsd->out_buf +
fsd->out_buf_start, fsd->out_buf_remaining);
+ to_write -= fsd->out_buf_remaining;
+ buf_written += fsd->out_buf_remaining;
+ fsd->out_buf_remaining = 0;
+ } else {
+ memcpy((char*) buf + buf_written, fsd->out_buf +
fsd->out_buf_start, to_write);
+ fsd->out_buf_start += to_write;
+ fsd->out_buf_remaining -= to_write;
+ to_write = 0;
+ }
+ }
+ return cnt;
+}
+
+
+static void
+stream_fwf_close(stream *s)
+{
+ if (strcmp(s->name, STREAM_FWF_NAME) == 0) {
+ stream_fwf_data *fsd = (stream_fwf_data*) s->stream_data.p;
+ fsd->s->close(fsd->s);
+ free(fsd->widths);
+ free(fsd->in_buf);
+ free(fsd->out_buf);
+ free(fsd);
+ }
+ destroy(s);
+}
+
+stream*
+stream_fwf_create (stream *s, size_t num_fields, size_t *widths, char filler)
+{
+ stream *ns;
+ stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data));
+ size_t i, out_buf_len;
+ if (!fsd) {
+ return NULL;
+ }
+ fsd->s = s;
+ fsd->num_fields = num_fields;
+ fsd->widths = widths;
+ fsd->filler = filler;
+ fsd->line_len = 1; // newline
+ for (i = 0; i < num_fields; i++) {
+ fsd->line_len += widths[i];
+ }
+ fsd->in_buf = malloc(fsd->line_len);
+ if (!fsd->in_buf) {
+ free(fsd);
+ return NULL;
+ }
+ out_buf_len = fsd->line_len * 2; // TODO: what if this is not enough?
+ fsd->out_buf = malloc(out_buf_len);
+ if (!fsd->out_buf) {
+ free(fsd->in_buf);
+ free(fsd);
+ return NULL;
+ }
+ fsd->out_buf_remaining = 0;
+
+ if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) {
+ free(fsd->in_buf);
+ free(fsd->out_buf);
+ free(fsd);
+ return NULL;
+ }
+ ns->read = stream_fwf_read;
+ ns->close = stream_fwf_close;
+ ns->write = NULL;
+ ns->flush = NULL;
+ ns->access = ST_READ;
+ ns->stream_data.p = fsd;
+ return ns;
+}
+
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -254,4 +254,6 @@ stream_export stream *callback_stream(
stream_export stream* stream_blackhole_create(void);
+stream_export stream* stream_fwf_create(stream *s, size_t num_fields, size_t
*widths, char filler);
+
#endif /*_STREAM_H_*/
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list