Hi,

I think I now have a first version of a patch to speed up Osm2pgsql by
parallelizing certain parts of it. Specifically it parallelizes the
stages of "Going over pending ways" / "Going over pending relations".

Previously osm2pgsql would fetch all ways / relations that are marked as
"pending" and process them one by one. With this patch osm2pgsql spawns
a (configurable) number of workers that go through  the list in
parallel. If osm2pgsql is given enough cache, these stages are mostly
CPU bound during initial import. Therefore, parallelization gives a near
linear speed up. If not enough cache is available, then there is still a
possibility of speed up, although that then depends heavily on the disk
subsystem.

As I only have a laptop available, I can't really do any extensive
(performance) testing on the patch, so how much speedup it will give in
reality I can't say.the

On 01/-10/-28163 12:59 PM, Frederik Ramm wrote:
> 
> 
> Have you considered multiprocessing (i.e. fork) instead of
> multithreading - would this perhaps make these things go away elegantly?
> Personally I abhor multithreading for the complexity it brings at
> (usually) little gain compared to simply forking a few worker processes
> but of course YMMV especially if you want tight communication between
> workers.
> 

In this version of the patch, I did choose the route of forking, which
indeed was considerably easier than to figure out which parts are and
are not thread safe.


I have done a bunch of testing with the patch and so far it looks like
everything is working. However, as I am not 100% sure, I didn't just
want to commit this patch to svn. Instead, it would be great if someone
could review the patch or test it, to make sure in works fully. I have
attached the patch to this email.

Any feedback or suggestions are welcome

Kai
diff --git a/configure.ac b/configure.ac
index b6e0758..8389807 100644
--- a/configure.ac
+++ b/configure.ac
@@ -19,6 +19,8 @@ AC_CONFIG_MACRO_DIR([m4])
 dnl Generate configuration header file
 AC_CONFIG_HEADER(config.h)
 
+AC_HEADER_SYS_WAIT
+
 dnl Find C compiler
 AC_PROG_CC_C99
 
diff --git a/middle-pgsql.c b/middle-pgsql.c
index 28cb2f8..c09e2fb 100644
--- a/middle-pgsql.c
+++ b/middle-pgsql.c
@@ -19,6 +19,9 @@
 #ifdef HAVE_PTHREAD
 #include <pthread.h>
 #endif
+#ifdef HAVE_SYS_WAIT_H
+#include <sys/wait.h>
+#endif 
 
 #include <libpq-fe.h>
 
@@ -56,6 +59,7 @@ struct table_desc {
     const char *array_indexes;
 
     int copyMode;    /* True if we are in copy mode */
+    int transactionMode;    /* True if we are in an extended transaction */
     PGconn *sql_conn;
 };
 
@@ -196,6 +200,8 @@ int nodesCacheHits, nodesCacheLookups;
 
 static int Append;
 
+const struct output_options *out_options;
+
 static inline int id2block(osmid_t id)
 {
     // + NUM_BLOCKS/2 allows for negative IDs
@@ -410,6 +416,35 @@ int pgsql_ram_nodes_get(struct osmNode *out, osmid_t id)
     return 0;
 }
 
+static int pgsql_connect(const struct output_options *options) {
+    int i;
+    /* We use a connection per table to enable the use of COPY */
+    for (i=0; i<num_tables; i++) {
+        PGconn *sql_conn;
+        sql_conn = PQconnectdb(options->conninfo);
+
+        /* Check to see that the backend connection was successfully made */
+        if (PQstatus(sql_conn) != CONNECTION_OK) {
+            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
+            exit_nicely();
+        }
+        tables[i].sql_conn = sql_conn;
+
+        if (tables[i].prepare) {
+            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
+        }
+
+        if (tables[i].prepare_intarray) {
+            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
+        }
+
+        if (tables[i].copy) {
+            pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
+            tables[i].copyMode = 1;
+        }
+    }
+}
+
 static void pgsql_cleanup(void)
 {
     int i;
@@ -961,8 +996,10 @@ static int pgsql_ways_delete(osmid_t osm_id)
 
 static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
 {
+    int noProcs = out_options->num_procs;
+    int pid = 0;
     PGresult   *res_ways;
-    int i, count = 0;
+    int i, p, count = 0;
     /* The flag we pass to indicate that the way in question might exist already in the database */
     int exists = Append;
 
@@ -975,8 +1012,31 @@ static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags,
     
     res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
 
+    
+    /**
+     * To speed up processing of pending ways, fork noProcs worker processes
+     * each of which independently goes through an equal subset of the pending ways array
+     */
+    fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
+    for (p = 1; p < noProcs; p++) {
+        pid=fork();
+        if (pid==0) break;
+        if (pid==-1) {
+            exit_nicely();
+        }
+    }
+    if ((pid == 0) && (noProcs > 1)) {
+        /* After forking, need to reconnect to the postgresql db */
+        out_options->out->connect(out_options);
+        pgsql_connect(out_options);
+    } else {
+        p = 0;
+    }
+
     //fprintf(stderr, "\nIterating ways\n");
-    for (i = 0; i < PQntuples(res_ways); i++) {
+    /* Use a stride length of the number of worker processes,
+       starting with an offset for each worker process p */
+    for (i = p; i < PQntuples(res_ways); i+= noProcs) {
         osmid_t id = strtoosmid(PQgetvalue(res_ways, i, 0), NULL, 10);
         struct keyval tags;
         struct osmNode *nodes;
@@ -998,12 +1058,22 @@ static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags,
         free(nodes);
         resetList(&tags);
     }
+    if ((pid == 0) && (noProcs > 1)) {
+        pgsql_cleanup();
+        out_options->out->close();
+        exit(0);
+    } else {
+        for (p = 0; p < noProcs; p++) wait(NULL);
+        fprintf(stderr, "\nAll child processes exited\n");
+    }
 
-    PQclear(res_ways);
+    
     fprintf(stderr, "\n");
     time(&end);
     if (end - start > 0)
-        fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start), ((double)count / (double)(end - start)));
+        fprintf(stderr, "Pending ways took %ds at a rate of %.2f/s\n",(int)(end - start), 
+                ((double)PQntuples(res_ways) / (double)(end - start)));
+    PQclear(res_ways);
 }
 
 static int pgsql_way_changed(osmid_t osm_id)
@@ -1169,7 +1239,9 @@ static int pgsql_rels_delete(osmid_t osm_id)
 static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists))
 {
     PGresult   *res_rels;
-    int i, count = 0;
+    int noProcs = out_options->num_procs;
+    int pid;
+    int i, p, count = 0;
     /* The flag we pass to indicate that the way in question might exist already in the database */
     int exists = Append;
 
@@ -1182,8 +1254,23 @@ static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *m
     
     res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
 
+    fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
+    for (p = 1; p < noProcs; p++) {
+        pid=fork();
+        if (pid==0) break;
+        if (pid==-1) {
+            exit_nicely();
+        }
+    }
+    if ((pid == 0) && (noProcs > 1)) {
+        out_options->out->connect(out_options);
+        pgsql_connect(out_options);
+    } else {
+        p = 0;
+    }
+
     //fprintf(stderr, "\nIterating ways\n");
-    for (i = 0; i < PQntuples(res_rels); i++) {
+    for (i = p; i < PQntuples(res_rels); i+= noProcs) {
         osmid_t id = strtoosmid(PQgetvalue(res_rels, i, 0), NULL, 10);
         struct keyval tags;
         struct member *members;
@@ -1206,6 +1293,15 @@ static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *m
         resetList(&tags);
     }
 
+    if ((pid == 0) && (noProcs > 1)) {
+        pgsql_cleanup();
+        out_options->out->close();
+        exit(0);
+    } else {
+        for (p = 0; p < noProcs; p++) wait(NULL);
+        fprintf(stderr, "\nAll child processes exited\n");
+    }
+
     PQclear(res_rels);
     fprintf(stderr, "\n");
     time(&end);
@@ -1247,8 +1343,9 @@ static void pgsql_end(void)
         PGconn *sql_conn = tables[i].sql_conn;
  
         // Commit transaction
-        if (tables[i].stop) {
+        if (tables[i].stop && tables[i].transactionMode) {
             pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
+            tables[i].transactionMode = 0;
         }
 
     }
@@ -1334,6 +1431,7 @@ static inline void set_prefix_and_tbls(const char *prefix, const char *t, const
 
 static int build_indexes;
 
+
 static int pgsql_start(const struct output_options *options)
 {
     PGresult   *res;
@@ -1342,6 +1440,8 @@ static int pgsql_start(const struct output_options *options)
 
     scale = options->scale;
     Append = options->append;
+
+    out_options = options;
     
     /* How much we can fit, and make sure it's odd */
     maxBlocks = (options->cache*((1024*1024)/(PER_BLOCK*sizeof(struct ramNode)))) | 1;
@@ -1433,6 +1533,7 @@ static int pgsql_start(const struct output_options *options)
 
         if (tables[i].start) {
             pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
+            tables[i].transactionMode = 1;
         }
 
         if (dropcreate && tables[i].create) {
@@ -1460,16 +1561,30 @@ static int pgsql_start(const struct output_options *options)
     return 0;
 }
 
+static void pgsql_commit(void) {
+    int i;
+    for (i=0; i<num_tables; i++) {
+        PGconn *sql_conn = tables[i].sql_conn;
+        pgsql_endCopy(&tables[i]);
+        if (tables[i].stop && tables[i].transactionMode) {
+            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
+            tables[i].transactionMode = 0;
+        }
+    }
+}
+
 static void *pgsql_stop_one(void *arg)
 {
+    time_t start, end;
+    
     struct table_desc *table = arg;
     PGconn *sql_conn = table->sql_conn;
 
     fprintf(stderr, "Stopping table: %s\n", table->name);
     pgsql_endCopy(table);
-    if (table->stop) 
-        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
-
+    //if (table->stop) 
+    //    pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
+    time(&start);
     if (build_indexes && table->array_indexes) {
         char *buffer = (char *) malloc(strlen(table->array_indexes) + 99);
         // we need to insert before the TABLESPACE setting, if any
@@ -1496,7 +1611,8 @@ static void *pgsql_stop_one(void *arg)
     }
     PQfinish(sql_conn);
     table->sql_conn = NULL;
-    fprintf(stderr, "Stopped table: %s\n", table->name);
+    time(&end);
+    fprintf(stderr, "Stopped table: %s in %is\n", table->name, (int)(end - start));
     return NULL;
 }
 
@@ -1541,6 +1657,7 @@ struct middle_t mid_pgsql = {
         .cleanup           = pgsql_cleanup,
         .analyze           = pgsql_analyze,
         .end               = pgsql_end,
+        .commit            = pgsql_commit,
 
         .nodes_set         = pgsql_nodes_set,
 #if 0
diff --git a/middle-ram.c b/middle-ram.c
index e3b7116..d3d7969 100644
--- a/middle-ram.c
+++ b/middle-ram.c
@@ -402,12 +402,16 @@ static void ram_stop(void)
     }
 }
 
+static void ram_commit(void) {
+}
+
 struct middle_t mid_ram = {
     .start             = ram_start,
     .stop              = ram_stop,
     .end               = ram_end,
     .cleanup           = ram_stop,
     .analyze           = ram_analyze,
+    .commit            = ram_commit,
     .nodes_set         = ram_nodes_set,
 #if 0
     .nodes_get         = ram_nodes_get,
diff --git a/middle.h b/middle.h
index c3e11b2..4db7fb7 100644
--- a/middle.h
+++ b/middle.h
@@ -19,6 +19,7 @@ struct middle_t {
     void (*cleanup)(void);
     void (*analyze)(void);
     void (*end)(void);
+    void (*commit)(void);
 
     int (*nodes_set)(osmid_t id, double lat, double lon, struct keyval *tags);
     int (*nodes_get_list)(struct osmNode *out, osmid_t *nds, int nd_count);
diff --git a/osm2pgsql.c b/osm2pgsql.c
index b64d2d3..e5366e0 100644
--- a/osm2pgsql.c
+++ b/osm2pgsql.c
@@ -192,6 +192,8 @@ static void long_usage(char *arg0)
     printf("   -K|--keep-coastlines\tKeep coastline data rather than filtering it out.\n");
     printf("              \t\tBy default natural=coastline tagged data will be discarded based on the\n");
     printf("              \t\tassumption that post-processed Coastline Checker shapefiles will be used.\n");
+    printf("      --number-processes\t\tSpecifies the number of parallel processes used for certain operations\n");
+    printf("             \t\tDefault is 2\n");
     printf("   -I|--disable-parallel-indexing\t\tDisable indexing all tables concurrently.\n");
     printf("      --alloc-chunk\t\tAllocate node cache in chunks rather than as a whole.\n");
     printf("   -h|--help\t\tHelp information.\n");
@@ -327,6 +329,7 @@ int main(int argc, char *argv[])
     int enable_multi = 0;
     int parallel_indexing = 1;
     int alloc_chunkwise = 0;
+    int num_procs = 2;
     const char *expire_tiles_filename = "dirty_tiles";
     const char *db = "gis";
     const char *username=NULL;
@@ -394,6 +397,7 @@ int main(int argc, char *argv[])
             {"version", 0, 0, 'V'},
             {"disable-parallel-indexing", 0, 0, 'I'},
             {"alloc-chunk", 0, 0, 204},
+            {"number-processes", 1, 0, 205},
             {0, 0, 0, 0}
         };
 
@@ -451,6 +455,7 @@ int main(int argc, char *argv[])
 #endif
                 break;
             case 204: alloc_chunkwise=1; break;
+            case 205: num_procs = atoi(optarg); break;
             case 'V': exit(EXIT_SUCCESS);
             case '?':
             default:
@@ -476,6 +481,8 @@ int main(int argc, char *argv[])
 
     if (cache < 0) cache = 0;
 
+    if (num_procs < 1) num_procs = 1;
+
     if (pass_prompt)
         password = simple_prompt("Password:", 100, 0);
     else {
@@ -530,6 +537,7 @@ int main(int argc, char *argv[])
     options.keep_coastlines = keep_coastlines;
     options.parallel_indexing = parallel_indexing;
     options.alloc_chunkwise = alloc_chunkwise;
+    options.num_procs = num_procs;
 
     if (strcmp("pgsql", output_backend) == 0) {
       osmdata.out = &out_pgsql;
@@ -541,6 +549,7 @@ int main(int argc, char *argv[])
       fprintf(stderr, "Output backend `%s' not recognised. Should be one of [pgsql, gazetteer, null].\n", output_backend);
       exit(EXIT_FAILURE);
     }
+    options.out = osmdata.out;
 
     if (strcmp("auto", input_reader) != 0) {
       if (strcmp("libxml2", input_reader) == 0) {
diff --git a/output-pgsql.c b/output-pgsql.c
index 954405f..8234536 100644
--- a/output-pgsql.c
+++ b/output-pgsql.c
@@ -1138,6 +1138,22 @@ static int pgsql_out_relation(osmid_t id, struct keyval *rel_tags, struct osmNod
     return 0;
 }
 
+static int pgsql_out_connect(const struct output_options *options) {
+    int i;
+    for (i=0; i<NUM_TABLES; i++) {
+        PGconn *sql_conn;
+        sql_conn = PQconnectdb(options->conninfo);
+        
+        /* Check to see that the backend connection was successfully made */
+        if (PQstatus(sql_conn) != CONNECTION_OK) {
+            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
+            exit_nicely();
+        }
+        tables[i].sql_conn = sql_conn;
+        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
+    }
+}
+
 static int pgsql_out_start(const struct output_options *options)
 {
     char *sql, tmp[256];
@@ -1274,8 +1290,8 @@ static int pgsql_out_start(const struct output_options *options)
             PQclear(res);
 
             /* change the type of the geometry column if needed - this can only change to a more permisive type */
-            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "UPDATE geometry_columns SET type = '%s' where type != '%s' and f_table_name = '%s' and f_geometry_column = 'way'",
-                        tables[i].type, tables[i].type, tables[i].name);
+            //            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "UPDATE geometry_columns SET type = '%s' where type != '%s' and f_table_name = '%s' and f_geometry_column = 'way'",
+            //           tables[i].type, tables[i].type, tables[i].name);
         }
         pgsql_exec(sql_conn, PGRES_COMMAND_OK, "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT ST_AsText(way) FROM %s WHERE osm_id = $1;\n", tables[i].name);
         
@@ -1343,6 +1359,27 @@ static void pgsql_pause_copy(struct s_table *table)
     table->copyMode = 0;
 }
 
+static void pgsql_out_close(void) {
+    int i;
+    for (i=0; i<NUM_TABLES; i++) {
+        pgsql_pause_copy(&tables[i]);
+        // Commit transaction
+        pgsql_exec(tables[i].sql_conn, PGRES_COMMAND_OK, "COMMIT");
+        PQfinish(tables[i].sql_conn);
+        tables[i].sql_conn = NULL;
+    }
+}
+
+static void pgsql_out_commit(void) {
+    int i;
+    for (i=0; i<NUM_TABLES; i++) {
+        pgsql_pause_copy(&tables[i]);
+        // Commit transaction
+        fprintf(stderr, "Committing transaction for %s\n", tables[i].name);
+        pgsql_exec(tables[i].sql_conn, PGRES_COMMAND_OK, "COMMIT");
+    }
+}
+
 static void *pgsql_out_stop_one(void *arg)
 {
     struct s_table *table = arg;
@@ -1356,8 +1393,8 @@ static void *pgsql_out_stop_one(void *arg)
 
     pgsql_pause_copy(table);
     // Commit transaction
-    fprintf(stderr, "Committing transaction for %s\n", table->name);
-    pgsql_exec(sql_conn, PGRES_COMMAND_OK, "COMMIT");
+    //fprintf(stderr, "Committing transaction for %s\n", table->name);
+    //pgsql_exec(sql_conn, PGRES_COMMAND_OK, "COMMIT");
     if (!Options->append)
     {
         time_t start, end;
@@ -1395,6 +1432,7 @@ static void *pgsql_out_stop_one(void *arg)
     free(table->columns);
     return NULL;
 }
+
 static void pgsql_out_stop()
 {
     int i;
@@ -1402,9 +1440,31 @@ static void pgsql_out_stop()
     pthread_t threads[NUM_TABLES];
 #endif
 
+    /* Commit the transactions, so that multiple processes can
+     * access the data simultanious to process the rest in parallel
+     * as well as see the newly created tables.
+     */
+    pgsql_out_commit();
+    Options->mid->commit();
+    /* To prevent deadlocks in parallel processing, the mid tables need
+     * to stay out of a transaction. In this stage output tables are only
+     * written to and not read, so they can be processed as several parallel
+     * independent transactions
+     */
+    for (i=0; i<NUM_TABLES; i++) {
+        PGconn *sql_conn = tables[i].sql_conn;
+        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
+    }
     /* Processing any remaing to be processed ways */
     Options->mid->iterate_ways( pgsql_out_way );
+    pgsql_out_commit();
+    Options->mid->commit();
+    for (i=0; i<NUM_TABLES; i++) {
+        PGconn *sql_conn = tables[i].sql_conn;
+        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
+    }
     Options->mid->iterate_relations( pgsql_process_relation );
+    pgsql_out_commit();
 
 #ifdef HAVE_PTHREAD
     if (Options->parallel_indexing) {
@@ -1663,8 +1723,10 @@ static int pgsql_modify_relation(osmid_t osm_id, struct member *members, int mem
 
 struct output_t out_pgsql = {
         .start           = pgsql_out_start,
+        .connect         = pgsql_out_connect,
         .stop            = pgsql_out_stop,
         .cleanup         = pgsql_out_cleanup,
+        .close           = pgsql_out_close,
         .node_add        = pgsql_add_node,
         .way_add         = pgsql_add_way,
         .relation_add    = pgsql_add_relation,
diff --git a/output.h b/output.h
index 7cca650..cd68453 100644
--- a/output.h
+++ b/output.h
@@ -30,6 +30,7 @@ struct output_options {
   int slim;        /* In slim mode */
   int cache;       /* Memory usable for cache in MB */
   struct middle_t *mid;  /* Mid storage to use */
+  struct output_t *out;  /* Output type used */
   const char *tblsmain_index;     /* Pg Tablespace to store indexes on main tables */
   const char *tblsslim_index;     /* Pg Tablespace to store indexes on slim tables */
   const char *tblsmain_data;     /* Pg Tablespace to store main tables */
@@ -45,12 +46,15 @@ struct output_options {
   int keep_coastlines;
   int parallel_indexing;
   int alloc_chunkwise;
+  int num_procs;
 };
 
 struct output_t {
     int (*start)(const struct output_options *options);
+    int (*connect)(const struct output_options *options);
     void (*stop)();
     void (*cleanup)(void);
+    void (*close)(void);
 //    void (*process)(struct middle_t *mid);
 //    int (*node)(osmid_t id, struct keyval *tags, double node_lat, double node_lon);
 //    int (*way)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count);
_______________________________________________
dev mailing list
[email protected]
http://lists.openstreetmap.org/listinfo/dev

Reply via email to