Hello.

I have a bit of trouble understanding the current DataPort and why it does not work on huge tables. It looks very much like cayenne is trying to get all the rows from the table into memory before starting to insert. Setting setPageSize on the select did not fix the problem.

I have attached my current jdbc-based processInsert-method for others to play with. It reduces the number of problems with plain jdbc by using DbAdapter for binding parameters and determine support for batching.

Regards,
 - Tore.

protected void processInsert(List entities) throws CayenneException {
        // Allow delegate to modify the list of entities
        // any way it wants. For instance delegate may filter
        // or sort the list (though it doesn't have to, and can simply
        // pass through the original list).
        if (delegate != null) {
            entities = delegate.willCleanData(this, entities);
        }

        if (entities == null || entities.isEmpty()) {
            return;
        }

        Connection sourceConn = null;
        Connection destConn = null;

        DbAdapter destAdapter = getDestinationNode().getAdapter();
        boolean useInsertBatch = destAdapter.supportsBatchUpdates();

        try {
sourceConn = getSourceNode().getDataSource ().getConnection(); destConn = getDestinationNode().getDataSource ().getConnection();
            destConn.setAutoCommit(false);

            // process ordered list of entities one by one
            Iterator it = entities.iterator();
            while (it.hasNext()) {
                DbEntity entity = (DbEntity) it.next();

                // skip derived DbEntities...
                if (entity instanceof DerivedDbEntity) {
                    continue;
                }

                StringBuffer selectsql = new StringBuffer();
                selectsql.append("SELECT ");

                StringBuffer insertsql = new StringBuffer();
                insertsql.append("INSERT INTO ");
                insertsql.append(entity.getFullyQualifiedName());
                insertsql.append(' ');

                StringBuffer fields = new StringBuffer();
                StringBuffer parameters = new StringBuffer();

                // make sure we always are using the same order
List attributes = new ArrayList(entity.getAttributes ());

for (Iterator ait = attributes.iterator(); ait.hasNext();) {
                    DbAttribute attribute = (DbAttribute) ait.next();
                    fields.append(attribute.getName());
                    parameters.append('?');
                    if (ait.hasNext()) {
                        fields.append(',');
                        parameters.append(',');
                    }
                }

                selectsql.append(fields);
                selectsql.append(" FROM ");
                selectsql.append(entity.getFullyQualifiedName());

                insertsql.append('(');
                insertsql.append(fields);
                insertsql.append(") VALUES (");
                insertsql.append(parameters);
                insertsql.append(')');

                QueryLogger.log(selectsql.toString());
                QueryLogger.log(insertsql.toString());

                Statement selectst = sourceConn.createStatement();
                // TODO: configure/determine fetch size.
                selectst.setFetchSize(100);
ResultSet rs = selectst.executeQuery (selectsql.toString());

PreparedStatement insertst = destConn.prepareStatement(insertsql.toString());

                int size = 0;
                long rows = 0;
                int rowsInBatch = 0;
                long start = System.currentTimeMillis();

                while (rs.next()) {
                    rows++;
                    rowsInBatch++;
                    for (int i = 0; i < attributes.size(); i++) {
DbAttribute attribute = (DbAttribute) attributes.get(i);

                        int idx = i + 1;
                        int type = attribute.getType();
                        int prec = attribute.getPrecision();

                        switch (type) {
                        case Types.BLOB:
                            Blob blob = rs.getBlob(idx);
                            if (blob != null) {
                                size = size + (int) blob.length();
                            }
destAdapter.bindParameter(insertst, blob, idx, type, prec);
                            break;
                        case Types.VARBINARY:
                        case Types.LONGVARBINARY:
                            byte[] bindata = rs.getBytes(idx);
                            if (bindata != null) {
                                size = size + bindata.length;
                            }
destAdapter.bindParameter(insertst, bindata, idx, type, prec);
                            break;
                        case Types.CLOB:
                        case Types.CHAR:
                        case Types.VARCHAR:
                            String string = rs.getString(idx);
                            if (string != null) {
                                size = size + string.length();
                            }
destAdapter.bindParameter(insertst, string, idx, type, prec);
                            break;
                        default:
                            Object o = rs.getObject(idx);
                            size = size + 1;
destAdapter.bindParameter(insertst, o, idx, type, prec);
                            break;
                        }

                    }

                    if (useInsertBatch) {
                        insertst.addBatch();
                    } else {
                        insertst.execute();
                    }

                    // try to be clever about when to commit
                    // TODO: configure
                    if (size > 1000000) {
                        if (useInsertBatch) {
                            insertst.executeBatch();
                        }
int used = (int) ((System.currentTimeMillis () - start) / 1000l); QueryLogger.log("partly commit " + entity.getName() + " at row " + rows + ". " + (rowsInBatch / used) + " rows/s.");
                        destConn.commit();
                        size = 0;
                        rowsInBatch = 0;
                        start = System.currentTimeMillis();
                    }
                }

                if (useInsertBatch) {
                    insertst.executeBatch();
                }
QueryLogger.log("commit " + entity.getName() + ". " + rows + " rows.");
                destConn.commit();

                rs.close();
                insertst.close();
                selectst.close();

            }

        } catch (SQLException e) {
            throw new CayenneException(e);
        } catch (Exception e) {
            throw new CayenneException(e);
        } finally {
            if (sourceConn != null) {
                try {
                    sourceConn.close();
                } catch (SQLException e) {
                }
            }
            if (destConn != null) {
                try {
                    destConn.close();
                } catch (SQLException e) {
                }
            }
        }
    }


Reply via email to