Changeset: adec669722c0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=adec669722c0
Modified Files:
tools/merovingian/daemon/multiplex-funnel.c
Branch: default
Log Message:
multiplex-funnel: support more than just Q_TABLE results
Add support for Q_TRANS, Q_UPDATE and Q_SCHEMA query types. No support
is implemented for Q_PREPARE (would require globally consistent exec ids
so near to impossible).
One can now perform create, insert, update, delete and select statements
over a multiplex funnel getting single answers to all queries (though
seeing weird results for update queries in their simplest form of
course). Transaction support was added because it is very simple,
however, I doubt if it really a good idea to use it with a cluster.
diffs (185 lines):
diff --git a/tools/merovingian/daemon/multiplex-funnel.c
b/tools/merovingian/daemon/multiplex-funnel.c
--- a/tools/merovingian/daemon/multiplex-funnel.c
+++ b/tools/merovingian/daemon/multiplex-funnel.c
@@ -387,7 +387,7 @@ multiplexQuery(multiplex *m, char *buf,
MapiHdl h;
mapi_int64 rlen;
int fcnt;
- char emptyres, isempty;
+ int qtype;
/* first send the query to all, such that we don't waste time
* waiting for each server to produce an answer, but wait for all of
@@ -421,9 +421,10 @@ multiplexQuery(multiplex *m, char *buf,
t = NULL;
rlen = 0;
fcnt = -1;
- emptyres = 0;
+ qtype = -1;
for (i = 0; i < m->dbcc; i++) {
h = m->dbcv[i]->hdl;
+ /* check for responses */
if (mapi_read_response(h) != MOK) {
t = mapi_result_error(h);
mnstr_printf(fout, "!node %s failed: %s\n",
@@ -432,6 +433,7 @@ multiplexQuery(multiplex *m, char *buf,
m->dbcv[i]->database, t ? t : "(no
error)");
break;
}
+ /* check for errors */
if ((t = mapi_result_error(h)) != NULL) {
mnstr_printf(fout, "!node %s failed: %s\n",
m->dbcv[i]->database, t);
@@ -439,60 +441,108 @@ multiplexQuery(multiplex *m, char *buf,
m->dbcv[i]->database, t);
break;
}
- isempty = 0;
- /* mapi return Q_PARSE for empty results */
- if (mapi_get_querytype(h) == Q_PARSE)
- emptyres = isempty = 1;
- if (emptyres && !isempty) {
+ /* check for result type consistency */
+ if (qtype == -1) {
+ qtype = mapi_get_querytype(h);
+ } else if (qtype != mapi_get_querytype(h)) {
t = "err"; /* for cleanup code below */
- mnstr_printf(fout, "!node %s returned a result while
previous "
- "did not\n", m->dbcv[i]->database);
- Mfprintf(stderr, "encountered mix of empty and
non-empty "
- "results\n");
+ mnstr_printf(fout, "!node %s returned a different type
of result "
+ "than the previous node\n",
m->dbcv[i]->database);
+ Mfprintf(stderr, "encountered mix of result types, "
+ "got %d, expected %d\n",
mapi_get_querytype(h), qtype);
break;
}
- if (isempty)
- continue;
- /* only support Q_TABLE, because appending is easy */
- if (mapi_get_querytype(h) != Q_TABLE) {
- t = "err"; /* for cleanup code below */
- mnstr_printf(fout, "!node %s returned a non-table
result\n",
- m->dbcv[i]->database);
- Mfprintf(stderr, "querytype != Q_TABLE for %s: %d\n",
- m->dbcv[i]->database,
mapi_get_querytype(h));
+
+ /* determine correctness based on headers */
+ switch (qtype) {
+ case Q_PARSE:
+ /* mapi returns Q_PARSE for empty results */
+ continue;
+ case Q_TABLE:
+ /* prepare easily appending of all results */
+ rlen += mapi_get_row_count(h);
+ if (fcnt == -1) {
+ fcnt = mapi_get_field_count(h);
+ } else if (mapi_get_field_count(h) != fcnt) {
+ t = "err"; /* for cleanup code below */
+ mnstr_printf(fout, "!node %s has
mismatch in result fields\n",
+ m->dbcv[i]->database);
+ Mfprintf(stderr, "mapi_get_field_count
inconsistent for %s: "
+ "got %d, expected %d\n",
+ m->dbcv[i]->database,
+
mapi_get_field_count(h), fcnt);
+ }
+ break;
+ case Q_UPDATE:
+ /* just pile up the update counts */
+ rlen += mapi_rows_affected(h);
+ break;
+ case Q_SCHEMA:
+ /* accept, just write ok lateron */
+ break;
+ case Q_TRANS:
+ /* just check all servers end up in the same
state */
+ if (fcnt == -1) {
+ fcnt =
mapi_get_autocommit(m->dbcv[i]->conn);
+ } else if (fcnt !=
mapi_get_autocommit(m->dbcv[i]->conn)) {
+ t = "err"; /* for cleanup code below */
+ mnstr_printf(fout, "!node %s has
mismatch in transaction state\n",
+ m->dbcv[i]->database);
+ Mfprintf(stderr, "mapi_get_autocommit
inconsistent for %s: "
+ "got %d, expected %d\n",
+ m->dbcv[i]->database,
+
mapi_get_autocommit(m->dbcv[i]->conn), fcnt);
+ }
+ break;
+ default:
+ t = "err"; /* for cleanup code below */
+ mnstr_printf(fout, "!node %s returned unhandled
result type\n",
+ m->dbcv[i]->database);
+ Mfprintf(stderr, "unhandled querytype for %s:
%d\n",
+ m->dbcv[i]->database,
mapi_get_querytype(h));
+ break;
+ }
+ if (t != NULL)
break;
- }
- rlen += mapi_get_row_count(h);
- if (fcnt == -1) {
- fcnt = mapi_get_field_count(h);
- } else {
- if (mapi_get_field_count(h) != fcnt) {
- t = "err"; /* for cleanup code below */
- mnstr_printf(fout, "!node %s has mismatch in
result fields\n",
- m->dbcv[i]->database);
- Mfprintf(stderr, "mapi_get_field_count
inconsistent for %s: "
- "got %d, expected %d\n",
- m->dbcv[i]->database,
- mapi_get_field_count(h), fcnt);
- break;
- }
- }
}
- if (t != NULL || emptyres) {
+
+ /* error or empty result, just end here */
+ if (t != NULL || qtype == Q_PARSE) {
mnstr_flush(fout);
for (i = 0; i < m->dbcc; i++)
mapi_close_handle(m->dbcv[i]->hdl);
return;
}
- /* Compose the header. For the table id, we just send 0, such that
- * we never get a close request. Steal headers from the first node. */
- mnstr_printf(fout, "&%d 0 " LLFMT " %d " LLFMT "\n", Q_TABLE, rlen,
fcnt, rlen);
- /* now read the answers, and write them directly to the client */
- for (i = 0; i < m->dbcc; i++) {
- h = m->dbcv[i]->hdl;
- while ((t = mapi_fetch_line(h)) != NULL)
- if (i == 0 || *t != '%') /* skip other server's headers
*/
- mnstr_printf(fout, "%s\n", t);
+
+ /* write output to client */
+ switch (qtype) {
+ case Q_TABLE:
+ /* Compose the header. For the table id, we just send
0,
+ * such that we never get a close request. Steal
headers
+ * from the first node. */
+ mnstr_printf(fout, "&%d 0 " LLFMT " %d " LLFMT "\n",
+ Q_TABLE, rlen, fcnt, rlen);
+ /* now read the answers, and write them directly to the
client */
+ for (i = 0; i < m->dbcc; i++) {
+ h = m->dbcv[i]->hdl;
+ while ((t = mapi_fetch_line(h)) != NULL)
+ if (i == 0 || *t != '%') /* skip other
server's headers */
+ mnstr_printf(fout, "%s\n", t);
+ }
+ break;
+ case Q_UPDATE:
+ /* Write a single header for all update counts, to sort
of
+ * complement the transparency created for Q_TABLE
results,
+ * but forget about last id data (wouldn't make sense if
+ * we'd emit multiple update counts either) */
+ mnstr_printf(fout, "&%d %lld -1", Q_UPDATE, rlen);
+ break;
+ case Q_SCHEMA:
+ mnstr_printf(fout, "&%d", Q_SCHEMA);
+ break;
+ case Q_TRANS:
+ mnstr_printf(fout, "&%d %c", Q_TRANS, fcnt ? 't' : 'f');
+ break;
}
mnstr_flush(fout);
/* finish up */
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list