Changeset: 8bc0d05e7742 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/8bc0d05e7742
Added Files:
sql/backends/monet5/sql_copyinto.c
sql/backends/monet5/sql_copyinto.h
Modified Files:
monetdb5/modules/mal/tablet.c
monetdb5/modules/mal/tablet.h
sql/backends/monet5/CMakeLists.txt
sql/backends/monet5/sql.c
sql/backends/monet5/sql.h
sql/backends/monet5/sql_result.c
Branch: directappend
Log Message:
Move most of tablet.c over to the sql module
diffs (truncated from 4551 to 300 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -6,93 +6,15 @@
* Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
*/
-/*
- * Niels Nes, Martin Kersten
- *
- * Parallel bulk load for SQL
- * The COPY INTO command for SQL is heavily CPU bound, which means
- * that ideally we would like to exploit the multi-cores to do that
- * work in parallel.
- * Complicating factors are the initial record offset, the
- * possible variable length of the input, and the original sort order
- * that should preferable be maintained.
- *
- * The code below consists of a file reader, which breaks up the
- * file into chunks of distinct rows. Then multiple parallel threads
- * grab them, and break them on the field boundaries.
- * After all fields are identified this way, the columns are converted
- * and stored in the BATs.
- *
- * The threads get a reference to a private copy of the READERtask.
- * It includes a list of columns they should handle. This is a basis
- * to distributed cheap and expensive columns over threads.
- *
- * The file reader overlaps IO with updates of the BAT.
- * Also the buffer size of the block stream might be a little small for
- * this task (1MB). It has been increased to 8MB, which indeed improved.
- *
- * The work divider allocates subtasks to threads based on the
- * observed time spending so far.
- */
-
#include "monetdb_config.h"
#include "tablet.h"
-#include "str.h"
-#include "mapi_prompt.h"
#include <string.h>
-#include <ctype.h>
-
-#define MAXWORKERS 64
-#define MAXBUFFERS 2
-/* We restrict the row length to be 32MB for the time being */
-#define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
-
-static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
-
-static BAT *
-void_bat_create(int adt, BUN nr)
-{
- BAT *b = COLnew(0, adt, nr, TRANSIENT);
-
- /* check for correct structures */
- if (b == NULL)
- return NULL;
- if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
- return NULL;
- }
-
- /* disable all properties here */
- b->tsorted = false;
- b->trevsorted = false;
- b->tnosorted = 0;
- b->tnorevsorted = 0;
- b->tseqbase = oid_nil;
- b->tkey = false;
- b->tnokey[0] = 0;
- b->tnokey[1] = 0;
- return b;
-}
-
-void
-TABLETdestroy_format(Tablet *as)
-{
- BUN p;
- Column *fmt = as->format;
-
- for (p = 0; p < as->nr_attrs; p++) {
- if (fmt[p].c)
- BBPunfix(fmt[p].c->batCacheid);
- if (fmt[p].data)
- GDKfree(fmt[p].data);
- }
- GDKfree(fmt);
-}
static oid
-check_BATs(Tablet *as)
+check_BATs(OutputTable *as)
{
- Column *fmt = as->format;
+ OutputColumn *fmt = as->format;
BUN i = 0;
BUN cnt;
oid base;
@@ -122,130 +44,6 @@ check_BATs(Tablet *as)
return base;
}
-str
-TABLETcreate_bats(Tablet *as, BUN est)
-{
- Column *fmt = as->format;
- BUN i, nr = 0;
-
- for (i = 0; i < as->nr_attrs; i++) {
- if (fmt[i].skip)
- continue;
- fmt[i].c = void_bat_create(fmt[i].adt, est);
- if (!fmt[i].c) {
- while (i > 0) {
- if (!fmt[--i].skip)
- BBPreclaim(fmt[i].c);
- }
- throw(SQL, "copy", "Failed to create bat of size "
BUNFMT "\n", as->nr);
- }
- fmt[i].ci = bat_iterator_nolock(fmt[i].c);
- nr++;
- }
- if (!nr)
- throw(SQL, "copy", "At least one column should be read from the
input\n");
- return MAL_SUCCEED;
-}
-
-str
-TABLETcollect(BAT **bats, Tablet *as)
-{
- Column *fmt = as->format;
- BUN i, j;
- BUN cnt = 0;
-
- if (bats == NULL)
- throw(SQL, "copy", "Missing container");
- for (i = 0; i < as->nr_attrs && !cnt; i++)
- if (!fmt[i].skip)
- cnt = BATcount(fmt[i].c);
- for (i = 0, j = 0; i < as->nr_attrs; i++) {
- if (fmt[i].skip)
- continue;
- bats[j] = fmt[i].c;
- BBPfix(bats[j]->batCacheid);
- if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
- throw(SQL, "copy", "Failed to set access at tablet part
" BUNFMT "\n", cnt);
- fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
- fmt[i].c->tkey = false;
- BATsettrivprop(fmt[i].c);
-
- if (cnt != BATcount(fmt[i].c))
- throw(SQL, "copy", "Count " BUNFMT " differs from "
BUNFMT "\n", BATcount(fmt[i].c), cnt);
- j++;
- }
- return MAL_SUCCEED;
-}
-
-str
-TABLETcollect_parts(BAT **bats, Tablet *as, BUN offset)
-{
- Column *fmt = as->format;
- BUN i, j;
- BUN cnt = 0;
-
- for (i = 0; i < as->nr_attrs && !cnt; i++)
- if (!fmt[i].skip)
- cnt = BATcount(fmt[i].c);
- for (i = 0, j = 0; i < as->nr_attrs; i++) {
- BAT *b, *bv = NULL;
- if (fmt[i].skip)
- continue;
- b = fmt[i].c;
- b->tsorted = b->trevsorted = false;
- b->tkey = false;
- BATsettrivprop(b);
- if ((b = BATsetaccess(b, BAT_READ)) == NULL) {
- fmt[i].c = NULL;
- throw(SQL, "copy", "Failed to set access at tablet part
" BUNFMT "\n", cnt);
- }
- bv = BATslice(b, (offset > 0) ? offset - 1 : 0, BATcount(b));
- bats[j] = bv;
-
- b->tkey = (offset > 0) ? FALSE : bv->tkey;
- b->tnonil &= bv->tnonil;
- if (b->tsorted != bv->tsorted)
- b->tsorted = false;
- if (b->trevsorted != bv->trevsorted)
- b->trevsorted = false;
- if (BATtdense(b))
- b->tkey = true;
- b->batDirtydesc = true;
-
- if (offset > 0) {
- BBPunfix(bv->batCacheid);
- bats[j] = BATslice(b, offset, BATcount(b));
- }
- if (cnt != BATcount(b))
- throw(SQL, "copy", "Count " BUNFMT " differs from "
BUNFMT "\n", BATcount(b), cnt);
- j++;
- }
- return MAL_SUCCEED;
-}
-
-// the starting quote character has already been skipped
-
-static char *
-tablet_skip_string(char *s, char quote, bool escape)
-{
- size_t i = 0, j = 0;
- while (s[i]) {
- if (escape && s[i] == '\\' && s[i + 1] != '\0')
- s[j++] = s[i++];
- else if (s[i] == quote) {
- if (s[i + 1] != quote)
- break;
- i++; /* skip the first quote
*/
- }
- s[j++] = s[i++];
- }
- assert(s[i] == quote || s[i] == '\0');
- if (s[i] == 0)
- return NULL;
- s[j] = 0;
- return s + i;
-}
-
static int
TABLET_error(stream *s)
{
@@ -260,7 +58,7 @@ TABLET_error(stream *s)
with UDP, where you may loose most of the information using short writes
*/
static inline int
-output_line(char **buf, size_t *len, char **localbuf, size_t *locallen, Column
*fmt, stream *fd, BUN nr_attrs, oid id)
+output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
OutputColumn *fmt, stream *fd, BUN nr_attrs, oid id)
{
BUN i;
ssize_t fill = 0;
@@ -274,7 +72,7 @@ output_line(char **buf, size_t *len, cha
}
if (i == nr_attrs) {
for (i = 0; i < nr_attrs; i++) {
- Column *f = fmt + i;
+ OutputColumn *f = fmt + i;
const char *p;
ssize_t l;
@@ -312,13 +110,13 @@ output_line(char **buf, size_t *len, cha
}
static inline int
-output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
Column *fmt, stream *fd, BUN nr_attrs)
+output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
OutputColumn *fmt, stream *fd, BUN nr_attrs)
{
BUN i;
ssize_t fill = 0;
for (i = 0; i < nr_attrs; i++) {
- Column *f = fmt + i;
+ OutputColumn *f = fmt + i;
const char *p;
ssize_t l;
@@ -356,12 +154,12 @@ output_line_dense(char **buf, size_t *le
}
static inline int
-output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd, BUN
nr_attrs, oid id)
+output_line_lookup(char **buf, size_t *len, OutputColumn *fmt, stream *fd, BUN
nr_attrs, oid id)
{
BUN i;
for (i = 0; i < nr_attrs; i++) {
- Column *f = fmt + i;
+ OutputColumn *f = fmt + i;
if (f->c) {
const void *p = BUNtail(f->ci, id - f->c->hseqbase);
@@ -383,79 +181,8 @@ output_line_lookup(char **buf, size_t *l
return 0;
}
-/* returns TRUE if there is/might be more */
-static bool
-tablet_read_more(bstream *in, stream *out, size_t n)
-{
- if (out) {
- do {
- /* query is not finished ask for more */
- /* we need more query text */
- if (bstream_next(in) < 0)
- return false;
- if (in->eof) {
- if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) -
1, 1) == 1)
- mnstr_flush(out, MNSTR_FLUSH_DATA);
- in->eof = false;
- /* we need more query text */
- if (bstream_next(in) <= 0)
- return false;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list