This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hop.git
The following commit(s) were added to refs/heads/master by this push:
new 2b8f979 HOP-2671: execute "connection sql" on bulk load
new 10eaf40 Merge pull request #793 from hansva/master
2b8f979 is described below
commit 2b8f9791cf077dab3bd2eaf9816eaa5b27f8e1b6
Author: Hans Van Akelyen <[email protected]>
AuthorDate: Fri May 7 11:48:18 2021 +0200
HOP-2671: execute "connection sql" on bulk load
---
.../monetdbbulkloader/MonetDbBulkLoader.java | 70 ++++++++++------------
1 file changed, 33 insertions(+), 37 deletions(-)
diff --git
a/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
b/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
index 9e97a0f..240c301 100644
---
a/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
+++
b/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
@@ -47,14 +47,9 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
private static final Class<?> PKG =
MonetDbBulkLoaderMeta.class; // for i18n purposes, needed by
Translator2!!
- private String message;
private IRowMeta physicalTableRowMeta;
private static final String ERROR_LOADING_DATA = "Error loading data: ";
- public String getMessage() {
- return message;
- }
-
public MonetDbBulkLoader(
TransformMeta transformMeta,
MonetDbBulkLoaderMeta meta,
@@ -65,10 +60,6 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
}
- protected void setMessage(String message) {
- this.message = message;
- }
-
protected String escapeOsPath(String path, boolean isWindows) {
StringBuilder sb = new StringBuilder();
@@ -85,7 +76,7 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
return sb.toString();
}
- public boolean execute(MonetDbBulkLoaderMeta meta, boolean wait) throws
HopException {
+ public boolean execute(MonetDbBulkLoaderMeta meta) throws HopException {
if (log.isDetailed()) {
logDetailed("Started execute");
}
@@ -129,7 +120,9 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
} catch (Exception e) {
// try again, with the unquoted table...
try {
- physicalTableRowMeta = db.getTableFields(meta.getTableName());
+ if (db != null) {
+ physicalTableRowMeta = db.getTableFields(meta.getTableName());
+ }
} catch (Exception e1) {
logBasic("Could not get metadata for the physical table " +
data.schemaTable + ".");
}
@@ -155,10 +148,8 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
setOutputDone();
if (!first) {
try {
- writeBufferToMonetDB();
+ writeBufferToMonetDB(meta.getDatabaseMeta());
data.out.flush();
- } catch (HopException ke) {
- throw ke;
} finally {
data.mserver.close();
}
@@ -178,10 +169,10 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
// execute the psql statement...
//
- execute(meta, true);
+ execute(meta);
}
- writeRowToMonetDB(getInputRowMeta(), r);
+ writeRowToMonetDB(getInputRowMeta(), r, meta.getDatabaseMeta());
putRow(getInputRowMeta(), r);
incrementLinesOutput();
@@ -195,9 +186,10 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
}
}
- protected void writeRowToMonetDB(IRowMeta rowMeta, Object[] r) throws
HopException {
+ protected void writeRowToMonetDB(IRowMeta rowMeta, Object[] r, DatabaseMeta
dm)
+ throws HopException {
if (data.bufferIndex == data.bufferSize || log.isDebug()) {
- writeBufferToMonetDB();
+ writeBufferToMonetDB(dm);
}
addRowToBuffer(rowMeta, r);
}
@@ -237,7 +229,6 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
len = MonetDBDatabaseMeta.DEFAULT_VARCHAR_LENGTH;
}
if (str.length() > len) {
- // TODO log this event
str = str.substring(0, len);
}
line.append(str);
@@ -260,7 +251,6 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
}
break;
//
- // TODO: Check MonetDB API for true column types and help set or
suggest the correct
// formatter pattern to
// the user.
//
@@ -349,7 +339,7 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
// Now that we have the line, grab the content and store it in the
buffer...
//
- data.rowBuffer[data.bufferIndex] = line.toString(); //
line.toByteArray();
+ data.rowBuffer[data.bufferIndex] = line.toString();
data.bufferIndex++;
} catch (Exception e) {
throw new HopException("Error serializing rows of data to the MonetDB
API (MAPI).", e);
@@ -419,7 +409,7 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
}
}
- protected void writeBufferToMonetDB() throws HopException {
+ protected void writeBufferToMonetDB(DatabaseMeta dm) throws HopException {
if (data.bufferIndex == 0) {
return;
}
@@ -450,7 +440,24 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
logDetailed(cmd);
}
- data.out.write('s');
+ // See if we need to execute extra SQL statements...
+ //
+ String sql = dm.getConnectSql();
+
+ // only execute if the SQL is not empty, null and is not just a bunch of
+ // spaces, tabs, CR etc.
+ if (!Utils.isEmpty(sql) && !Const.onlySpaces(sql)) {
+ String[] sqlStatements = sql.split(";");
+ for (String statement : sqlStatements) {
+ data.out.write('s');
+ data.out.write(statement);
+ data.out.write(';');
+ data.out.newLine();
+ }
+ } else {
+ data.out.write('s');
+ }
+
data.out.write(cmdBuff.toString());
data.out.newLine();
@@ -553,7 +560,7 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
// Allocate the buffer
//
- data.rowBuffer = new String[data.bufferSize]; // new
byte[data.bufferSize][];
+ data.rowBuffer = new String[data.bufferSize];
data.bufferIndex = 0;
// Make sure our database connection settings are consistent with our
dialog settings by
@@ -580,11 +587,6 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
}
@Override
- public void dispose() {
- super.dispose();
- }
-
- @Override
public MonetDbBulkLoaderData getData() {
return this.data;
}
@@ -600,9 +602,7 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
String password = Utils.resolvePassword(variables,
Const.NVL(dm.getPassword(), ""));
String db = resolve(Const.NVL(dm.getDatabaseName(), ""));
- MapiSocket mserver =
- getMonetDBConnection(hostname, Integer.parseInt(portnum), user,
password, db);
- return mserver;
+ return getMonetDBConnection(hostname, Integer.parseInt(portnum), user,
password, db, log);
}
protected static MapiSocket getMonetDBConnection(
@@ -681,15 +681,13 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
// marker that should not be omitted, likewise the
// trailing semicolon
out.write('s');
- System.out.println(query);
out.write(query);
out.write(';');
out.newLine();
out.writeLine("");
- String line = null;
- while ((line = in.readLine()) != null) {
+ while (in.readLine() != null) {
int type = in.getLineType();
// read till we get back to the prompt
@@ -699,10 +697,8 @@ public class MonetDbBulkLoader extends
BaseTransform<MonetDbBulkLoaderMeta, Mone
switch (type) {
case BufferedMCLReader.ERROR:
- System.err.println(line);
break;
case BufferedMCLReader.RESULT:
- System.out.println(line);
break;
default:
// unknown, header, ...