Hi,
I think I now have a first version of a patch to speed up Osm2pgsql by
parallelizing certain parts of it. Specifically it parallelizes the
stages of "Going over pending ways" / "Going over pending relations".
Previously osm2pgsql would fetch all ways / relations that are marked as
"pending" and process them one by one. With this patch osm2pgsql spawns
a (configurable) number of workers that go through the list in
parallel. If osm2pgsql is given enough cache, these stages are mostly
CPU bound during initial import. Therefore, parallelization gives a near
linear speed up. If not enough cache is available, then there is still a
possibility of speed up, although that then depends heavily on the disk
subsystem.
As I only have a laptop available, I can't really do any extensive
(performance) testing on the patch, so how much speedup it will give in
reality I can't say.the
On 01/-10/-28163 12:59 PM, Frederik Ramm wrote:
>
>
> Have you considered multiprocessing (i.e. fork) instead of
> multithreading - would this perhaps make these things go away elegantly?
> Personally I abhor multithreading for the complexity it brings at
> (usually) little gain compared to simply forking a few worker processes
> but of course YMMV especially if you want tight communication between
> workers.
>
In this version of the patch, I did choose the route of forking, which
indeed was considerably easier than to figure out which parts are and
are not thread safe.
I have done a bunch of testing with the patch and so far it looks like
everything is working. However, as I am not 100% sure, I didn't just
want to commit this patch to svn. Instead, it would be great if someone
could review the patch or test it, to make sure in works fully. I have
attached the patch to this email.
Any feedback or suggestions are welcome
Kai
diff --git a/configure.ac b/configure.ac
index b6e0758..8389807 100644
--- a/configure.ac
+++ b/configure.ac
@@ -19,6 +19,8 @@ AC_CONFIG_MACRO_DIR([m4])
dnl Generate configuration header file
AC_CONFIG_HEADER(config.h)
+AC_HEADER_SYS_WAIT
+
dnl Find C compiler
AC_PROG_CC_C99
diff --git a/middle-pgsql.c b/middle-pgsql.c
index 28cb2f8..c09e2fb 100644
--- a/middle-pgsql.c
+++ b/middle-pgsql.c
@@ -19,6 +19,9 @@
#ifdef HAVE_PTHREAD
#include <pthread.h>
#endif
+#ifdef HAVE_SYS_WAIT_H
+#include <sys/wait.h>
+#endif
#include <libpq-fe.h>
@@ -56,6 +59,7 @@ struct table_desc {
const char *array_indexes;
int copyMode; /* True if we are in copy mode */
+ int transactionMode; /* True if we are in an extended transaction */
PGconn *sql_conn;
};
@@ -196,6 +200,8 @@ int nodesCacheHits, nodesCacheLookups;
static int Append;
+const struct output_options *out_options;
+
static inline int id2block(osmid_t id)
{
// + NUM_BLOCKS/2 allows for negative IDs
@@ -410,6 +416,35 @@ int pgsql_ram_nodes_get(struct osmNode *out, osmid_t id)
return 0;
}
+static int pgsql_connect(const struct output_options *options) {
+ int i;
+ /* We use a connection per table to enable the use of COPY */
+ for (i=0; i<num_tables; i++) {
+ PGconn *sql_conn;
+ sql_conn = PQconnectdb(options->conninfo);
+
+ /* Check to see that the backend connection was successfully made */
+ if (PQstatus(sql_conn) != CONNECTION_OK) {
+ fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
+ exit_nicely();
+ }
+ tables[i].sql_conn = sql_conn;
+
+ if (tables[i].prepare) {
+ pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
+ }
+
+ if (tables[i].prepare_intarray) {
+ pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
+ }
+
+ if (tables[i].copy) {
+ pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
+ tables[i].copyMode = 1;
+ }
+ }
+}
+
static void pgsql_cleanup(void)
{
int i;
@@ -961,8 +996,10 @@ static int pgsql_ways_delete(osmid_t osm_id)
static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
{
+ int noProcs = out_options->num_procs;
+ int pid = 0;
PGresult *res_ways;
- int i, count = 0;
+ int i, p, count = 0;
/* The flag we pass to indicate that the way in question might exist already in the database */
int exists = Append;
@@ -975,8 +1012,31 @@ static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags,
res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
+
+ /**
+ * To speed up processing of pending ways, fork noProcs worker processes
+ * each of which independently goes through an equal subset of the pending ways array
+ */
+ fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
+ for (p = 1; p < noProcs; p++) {
+ pid=fork();
+ if (pid==0) break;
+ if (pid==-1) {
+ exit_nicely();
+ }
+ }
+ if ((pid == 0) && (noProcs > 1)) {
+ /* After forking, need to reconnect to the postgresql db */
+ out_options->out->connect(out_options);
+ pgsql_connect(out_options);
+ } else {
+ p = 0;
+ }
+
//fprintf(stderr, "\nIterating ways\n");
- for (i = 0; i < PQntuples(res_ways); i++) {
+ /* Use a stride length of the number of worker processes,
+ starting with an offset for each worker process p */
+ for (i = p; i < PQntuples(res_ways); i+= noProcs) {
osmid_t id = strtoosmid(PQgetvalue(res_ways, i, 0), NULL, 10);
struct keyval tags;
struct osmNode *nodes;
@@ -998,12 +1058,22 @@ static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags,
free(nodes);
resetList(&tags);
}
+ if ((pid == 0) && (noProcs > 1)) {
+ pgsql_cleanup();
+ out_options->out->close();
+ exit(0);
+ } else {
+ for (p = 0; p < noProcs; p++) wait(NULL);
+ fprintf(stderr, "\nAll child processes exited\n");
+ }
- PQclear(res_ways);
+
fprintf(stderr, "\n");
time(&end);
if (end - start > 0)
- fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
+ fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start),
+ ((double)PQntuples(res_ways) / (double)(end - start)));
+ PQclear(res_ways);
}
static int pgsql_way_changed(osmid_t osm_id)
@@ -1169,7 +1239,9 @@ static int pgsql_rels_delete(osmid_t osm_id)
static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists))
{
PGresult *res_rels;
- int i, count = 0;
+ int noProcs = out_options->num_procs;
+ int pid;
+ int i, p, count = 0;
/* The flag we pass to indicate that the way in question might exist already in the database */
int exists = Append;
@@ -1182,8 +1254,23 @@ static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *m
res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
+ fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
+ for (p = 1; p < noProcs; p++) {
+ pid=fork();
+ if (pid==0) break;
+ if (pid==-1) {
+ exit_nicely();
+ }
+ }
+ if ((pid == 0) && (noProcs > 1)) {
+ out_options->out->connect(out_options);
+ pgsql_connect(out_options);
+ } else {
+ p = 0;
+ }
+
//fprintf(stderr, "\nIterating ways\n");
- for (i = 0; i < PQntuples(res_rels); i++) {
+ for (i = p; i < PQntuples(res_rels); i+= noProcs) {
osmid_t id = strtoosmid(PQgetvalue(res_rels, i, 0), NULL, 10);
struct keyval tags;
struct member *members;
@@ -1206,6 +1293,15 @@ static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *m
resetList(&tags);
}
+ if ((pid == 0) && (noProcs > 1)) {
+ pgsql_cleanup();
+ out_options->out->close();
+ exit(0);
+ } else {
+ for (p = 0; p < noProcs; p++) wait(NULL);
+ fprintf(stderr, "\nAll child processes exited\n");
+ }
+
PQclear(res_rels);
fprintf(stderr, "\n");
time(&end);
@@ -1247,8 +1343,9 @@ static void pgsql_end(void)
PGconn *sql_conn = tables[i].sql_conn;
// Commit transaction
- if (tables[i].stop) {
+ if (tables[i].stop && tables[i].transactionMode) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
+ tables[i].transactionMode = 0;
}
}
@@ -1334,6 +1431,7 @@ static inline void set_prefix_and_tbls(const char *prefix, const char *t, const
static int build_indexes;
+
static int pgsql_start(const struct output_options *options)
{
PGresult *res;
@@ -1342,6 +1440,8 @@ static int pgsql_start(const struct output_options *options)
scale = options->scale;
Append = options->append;
+
+ out_options = options;
/* How much we can fit, and make sure it's odd */
maxBlocks = (options->cache*((1024*1024)/(PER_BLOCK*sizeof(struct ramNode)))) | 1;
@@ -1433,6 +1533,7 @@ static int pgsql_start(const struct output_options *options)
if (tables[i].start) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
+ tables[i].transactionMode = 1;
}
if (dropcreate && tables[i].create) {
@@ -1460,16 +1561,30 @@ static int pgsql_start(const struct output_options *options)
return 0;
}
+static void pgsql_commit(void) {
+ int i;
+ for (i=0; i<num_tables; i++) {
+ PGconn *sql_conn = tables[i].sql_conn;
+ pgsql_endCopy(&tables[i]);
+ if (tables[i].stop && tables[i].transactionMode) {
+ pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
+ tables[i].transactionMode = 0;
+ }
+ }
+}
+
static void *pgsql_stop_one(void *arg)
{
+ time_t start, end;
+
struct table_desc *table = arg;
PGconn *sql_conn = table->sql_conn;
fprintf(stderr, "Stopping table: %s\n", table->name);
pgsql_endCopy(table);
- if (table->stop)
- pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
-
+ //if (table->stop)
+ // pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
+ time(&start);
if (build_indexes && table->array_indexes) {
char *buffer = (char *) malloc(strlen(table->array_indexes) + 99);
// we need to insert before the TABLESPACE setting, if any
@@ -1496,7 +1611,8 @@ static void *pgsql_stop_one(void *arg)
}
PQfinish(sql_conn);
table->sql_conn = NULL;
- fprintf(stderr, "Stopped table: %s\n", table->name);
+ time(&end);
+ fprintf(stderr, "Stopped table: %s in %is\n", table->name, (int)(end - start));
return NULL;
}
@@ -1541,6 +1657,7 @@ struct middle_t mid_pgsql = {
.cleanup = pgsql_cleanup,
.analyze = pgsql_analyze,
.end = pgsql_end,
+ .commit = pgsql_commit,
.nodes_set = pgsql_nodes_set,
#if 0
diff --git a/middle-ram.c b/middle-ram.c
index e3b7116..d3d7969 100644
--- a/middle-ram.c
+++ b/middle-ram.c
@@ -402,12 +402,16 @@ static void ram_stop(void)
}
}
+static void ram_commit(void) {
+}
+
struct middle_t mid_ram = {
.start = ram_start,
.stop = ram_stop,
.end = ram_end,
.cleanup = ram_stop,
.analyze = ram_analyze,
+ .commit = ram_commit,
.nodes_set = ram_nodes_set,
#if 0
.nodes_get = ram_nodes_get,
diff --git a/middle.h b/middle.h
index c3e11b2..4db7fb7 100644
--- a/middle.h
+++ b/middle.h
@@ -19,6 +19,7 @@ struct middle_t {
void (*cleanup)(void);
void (*analyze)(void);
void (*end)(void);
+ void (*commit)(void);
int (*nodes_set)(osmid_t id, double lat, double lon, struct keyval *tags);
int (*nodes_get_list)(struct osmNode *out, osmid_t *nds, int nd_count);
diff --git a/osm2pgsql.c b/osm2pgsql.c
index b64d2d3..e5366e0 100644
--- a/osm2pgsql.c
+++ b/osm2pgsql.c
@@ -192,6 +192,8 @@ static void long_usage(char *arg0)
printf(" -K|--keep-coastlines\tKeep coastline data rather than filtering it out.\n");
printf(" \t\tBy default natural=coastline tagged data will be discarded based on the\n");
printf(" \t\tassumption that post-processed Coastline Checker shapefiles will be used.\n");
+ printf(" --number-processes\t\tSpecifies the number of parallel processes used for certain operations\n");
+ printf(" \t\tDefault is 2\n");
printf(" -I|--disable-parallel-indexing\t\tDisable indexing all tables concurrently.\n");
printf(" --alloc-chunk\t\tAllocate node cache in chunks rather than as a whole.\n");
printf(" -h|--help\t\tHelp information.\n");
@@ -327,6 +329,7 @@ int main(int argc, char *argv[])
int enable_multi = 0;
int parallel_indexing = 1;
int alloc_chunkwise = 0;
+ int num_procs = 2;
const char *expire_tiles_filename = "dirty_tiles";
const char *db = "gis";
const char *username=NULL;
@@ -394,6 +397,7 @@ int main(int argc, char *argv[])
{"version", 0, 0, 'V'},
{"disable-parallel-indexing", 0, 0, 'I'},
{"alloc-chunk", 0, 0, 204},
+ {"number-processes", 1, 0, 205},
{0, 0, 0, 0}
};
@@ -451,6 +455,7 @@ int main(int argc, char *argv[])
#endif
break;
case 204: alloc_chunkwise=1; break;
+ case 205: num_procs = atoi(optarg); break;
case 'V': exit(EXIT_SUCCESS);
case '?':
default:
@@ -476,6 +481,8 @@ int main(int argc, char *argv[])
if (cache < 0) cache = 0;
+ if (num_procs < 1) num_procs = 1;
+
if (pass_prompt)
password = simple_prompt("Password:", 100, 0);
else {
@@ -530,6 +537,7 @@ int main(int argc, char *argv[])
options.keep_coastlines = keep_coastlines;
options.parallel_indexing = parallel_indexing;
options.alloc_chunkwise = alloc_chunkwise;
+ options.num_procs = num_procs;
if (strcmp("pgsql", output_backend) == 0) {
osmdata.out = &out_pgsql;
@@ -541,6 +549,7 @@ int main(int argc, char *argv[])
fprintf(stderr, "Output backend `%s' not recognised. Should be one of [pgsql, gazetteer, null].\n", output_backend);
exit(EXIT_FAILURE);
}
+ options.out = osmdata.out;
if (strcmp("auto", input_reader) != 0) {
if (strcmp("libxml2", input_reader) == 0) {
diff --git a/output-pgsql.c b/output-pgsql.c
index 954405f..8234536 100644
--- a/output-pgsql.c
+++ b/output-pgsql.c
@@ -1138,6 +1138,22 @@ static int pgsql_out_relation(osmid_t id, struct keyval *rel_tags, struct osmNod
return 0;
}
+static int pgsql_out_connect(const struct output_options *options) {
+ int i;
+ for (i=0; i<NUM_TABLES; i++) {
+ PGconn *sql_conn;
+ sql_conn = PQconnectdb(options->conninfo);
+
+ /* Check to see that the backend connection was successfully made */
+ if (PQstatus(sql_conn) != CONNECTION_OK) {
+ fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
+ exit_nicely();
+ }
+ tables[i].sql_conn = sql_conn;
+ pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
+ }
+}
+
static int pgsql_out_start(const struct output_options *options)
{
char *sql, tmp[256];
@@ -1274,8 +1290,8 @@ static int pgsql_out_start(const struct output_options *options)
PQclear(res);
/* change the type of the geometry column if needed - this can only change to a more permisive type */
- pgsql_exec(sql_conn, PGRES_COMMAND_OK, "UPDATE geometry_columns SET type = '%s' where type != '%s' and f_table_name = '%s' and f_geometry_column = 'way'",
- tables[i].type, tables[i].type, tables[i].name);
+ // pgsql_exec(sql_conn, PGRES_COMMAND_OK, "UPDATE geometry_columns SET type = '%s' where type != '%s' and f_table_name = '%s' and f_geometry_column = 'way'",
+ // tables[i].type, tables[i].type, tables[i].name);
}
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT ST_AsText(way) FROM %s WHERE osm_id = $1;\n", tables[i].name);
@@ -1343,6 +1359,27 @@ static void pgsql_pause_copy(struct s_table *table)
table->copyMode = 0;
}
+static void pgsql_out_close(void) {
+ int i;
+ for (i=0; i<NUM_TABLES; i++) {
+ pgsql_pause_copy(&tables[i]);
+ // Commit transaction
+ pgsql_exec(tables[i].sql_conn, PGRES_COMMAND_OK, "COMMIT");
+ PQfinish(tables[i].sql_conn);
+ tables[i].sql_conn = NULL;
+ }
+}
+
+static void pgsql_out_commit(void) {
+ int i;
+ for (i=0; i<NUM_TABLES; i++) {
+ pgsql_pause_copy(&tables[i]);
+ // Commit transaction
+ fprintf(stderr, "Committing transaction for %s\n", tables[i].name);
+ pgsql_exec(tables[i].sql_conn, PGRES_COMMAND_OK, "COMMIT");
+ }
+}
+
static void *pgsql_out_stop_one(void *arg)
{
struct s_table *table = arg;
@@ -1356,8 +1393,8 @@ static void *pgsql_out_stop_one(void *arg)
pgsql_pause_copy(table);
// Commit transaction
- fprintf(stderr, "Committing transaction for %s\n", table->name);
- pgsql_exec(sql_conn, PGRES_COMMAND_OK, "COMMIT");
+ //fprintf(stderr, "Committing transaction for %s\n", table->name);
+ //pgsql_exec(sql_conn, PGRES_COMMAND_OK, "COMMIT");
if (!Options->append)
{
time_t start, end;
@@ -1395,6 +1432,7 @@ static void *pgsql_out_stop_one(void *arg)
free(table->columns);
return NULL;
}
+
static void pgsql_out_stop()
{
int i;
@@ -1402,9 +1440,31 @@ static void pgsql_out_stop()
pthread_t threads[NUM_TABLES];
#endif
+ /* Commit the transactions, so that multiple processes can
+ * access the data simultanious to process the rest in parallel
+ * as well as see the newly created tables.
+ */
+ pgsql_out_commit();
+ Options->mid->commit();
+ /* To prevent deadlocks in parallel processing, the mid tables need
+ * to stay out of a transaction. In this stage output tables are only
+ * written to and not read, so they can be processed as several parallel
+ * independent transactions
+ */
+ for (i=0; i<NUM_TABLES; i++) {
+ PGconn *sql_conn = tables[i].sql_conn;
+ pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
+ }
/* Processing any remaing to be processed ways */
Options->mid->iterate_ways( pgsql_out_way );
+ pgsql_out_commit();
+ Options->mid->commit();
+ for (i=0; i<NUM_TABLES; i++) {
+ PGconn *sql_conn = tables[i].sql_conn;
+ pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
+ }
Options->mid->iterate_relations( pgsql_process_relation );
+ pgsql_out_commit();
#ifdef HAVE_PTHREAD
if (Options->parallel_indexing) {
@@ -1663,8 +1723,10 @@ static int pgsql_modify_relation(osmid_t osm_id, struct member *members, int mem
struct output_t out_pgsql = {
.start = pgsql_out_start,
+ .connect = pgsql_out_connect,
.stop = pgsql_out_stop,
.cleanup = pgsql_out_cleanup,
+ .close = pgsql_out_close,
.node_add = pgsql_add_node,
.way_add = pgsql_add_way,
.relation_add = pgsql_add_relation,
diff --git a/output.h b/output.h
index 7cca650..cd68453 100644
--- a/output.h
+++ b/output.h
@@ -30,6 +30,7 @@ struct output_options {
int slim; /* In slim mode */
int cache; /* Memory usable for cache in MB */
struct middle_t *mid; /* Mid storage to use */
+ struct output_t *out; /* Output type used */
const char *tblsmain_index; /* Pg Tablespace to store indexes on main tables */
const char *tblsslim_index; /* Pg Tablespace to store indexes on slim tables */
const char *tblsmain_data; /* Pg Tablespace to store main tables */
@@ -45,12 +46,15 @@ struct output_options {
int keep_coastlines;
int parallel_indexing;
int alloc_chunkwise;
+ int num_procs;
};
struct output_t {
int (*start)(const struct output_options *options);
+ int (*connect)(const struct output_options *options);
void (*stop)();
void (*cleanup)(void);
+ void (*close)(void);
// void (*process)(struct middle_t *mid);
// int (*node)(osmid_t id, struct keyval *tags, double node_lat, double node_lon);
// int (*way)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count);
_______________________________________________
dev mailing list
[email protected]
http://lists.openstreetmap.org/listinfo/dev