This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hop.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b8f979  HOP-2671: execute "connection sql" on bulk load
     new 10eaf40  Merge pull request #793 from hansva/master
2b8f979 is described below

commit 2b8f9791cf077dab3bd2eaf9816eaa5b27f8e1b6
Author: Hans Van Akelyen <[email protected]>
AuthorDate: Fri May 7 11:48:18 2021 +0200

    HOP-2671: execute "connection sql" on bulk load
---
 .../monetdbbulkloader/MonetDbBulkLoader.java       | 70 ++++++++++------------
 1 file changed, 33 insertions(+), 37 deletions(-)

diff --git 
a/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
 
b/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
index 9e97a0f..240c301 100644
--- 
a/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
+++ 
b/plugins/transforms/monetdbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/monetdbbulkloader/MonetDbBulkLoader.java
@@ -47,14 +47,9 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
   private static final Class<?> PKG =
       MonetDbBulkLoaderMeta.class; // for i18n purposes, needed by 
Translator2!!
 
-  private String message;
   private IRowMeta physicalTableRowMeta;
   private static final String ERROR_LOADING_DATA = "Error loading data: ";
 
-  public String getMessage() {
-    return message;
-  }
-
   public MonetDbBulkLoader(
       TransformMeta transformMeta,
       MonetDbBulkLoaderMeta meta,
@@ -65,10 +60,6 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
     super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
   }
 
-  protected void setMessage(String message) {
-    this.message = message;
-  }
-
   protected String escapeOsPath(String path, boolean isWindows) {
 
     StringBuilder sb = new StringBuilder();
@@ -85,7 +76,7 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
     return sb.toString();
   }
 
-  public boolean execute(MonetDbBulkLoaderMeta meta, boolean wait) throws 
HopException {
+  public boolean execute(MonetDbBulkLoaderMeta meta) throws HopException {
     if (log.isDetailed()) {
       logDetailed("Started execute");
     }
@@ -129,7 +120,9 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
       } catch (Exception e) {
         // try again, with the unquoted table...
         try {
-          physicalTableRowMeta = db.getTableFields(meta.getTableName());
+          if (db != null) {
+            physicalTableRowMeta = db.getTableFields(meta.getTableName());
+          }
         } catch (Exception e1) {
           logBasic("Could not get metadata for the physical table " + 
data.schemaTable + ".");
         }
@@ -155,10 +148,8 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
         setOutputDone();
         if (!first) {
           try {
-            writeBufferToMonetDB();
+            writeBufferToMonetDB(meta.getDatabaseMeta());
             data.out.flush();
-          } catch (HopException ke) {
-            throw ke;
           } finally {
             data.mserver.close();
           }
@@ -178,10 +169,10 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
 
         // execute the psql statement...
         //
-        execute(meta, true);
+        execute(meta);
       }
 
-      writeRowToMonetDB(getInputRowMeta(), r);
+      writeRowToMonetDB(getInputRowMeta(), r, meta.getDatabaseMeta());
       putRow(getInputRowMeta(), r);
       incrementLinesOutput();
 
@@ -195,9 +186,10 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
     }
   }
 
-  protected void writeRowToMonetDB(IRowMeta rowMeta, Object[] r) throws 
HopException {
+  protected void writeRowToMonetDB(IRowMeta rowMeta, Object[] r, DatabaseMeta 
dm)
+      throws HopException {
     if (data.bufferIndex == data.bufferSize || log.isDebug()) {
-      writeBufferToMonetDB();
+      writeBufferToMonetDB(dm);
     }
     addRowToBuffer(rowMeta, r);
   }
@@ -237,7 +229,6 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
                   len = MonetDBDatabaseMeta.DEFAULT_VARCHAR_LENGTH;
                 }
                 if (str.length() > len) {
-                  // TODO log this event
                   str = str.substring(0, len);
                 }
                 line.append(str);
@@ -260,7 +251,6 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
               }
               break;
               //
-              // TODO: Check MonetDB API for true column types and help set or 
suggest the correct
               // formatter pattern to
               // the user.
               //
@@ -349,7 +339,7 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
 
       // Now that we have the line, grab the content and store it in the 
buffer...
       //
-      data.rowBuffer[data.bufferIndex] = line.toString(); // 
line.toByteArray();
+      data.rowBuffer[data.bufferIndex] = line.toString();
       data.bufferIndex++;
     } catch (Exception e) {
       throw new HopException("Error serializing rows of data to the MonetDB 
API (MAPI).", e);
@@ -419,7 +409,7 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
     }
   }
 
-  protected void writeBufferToMonetDB() throws HopException {
+  protected void writeBufferToMonetDB(DatabaseMeta dm) throws HopException {
     if (data.bufferIndex == 0) {
       return;
     }
@@ -450,7 +440,24 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
         logDetailed(cmd);
       }
 
-      data.out.write('s');
+      // See if we need to execute extra SQL statements...
+      //
+      String sql = dm.getConnectSql();
+
+      // only execute if the SQL is not empty, null and is not just a bunch of
+      // spaces, tabs, CR etc.
+      if (!Utils.isEmpty(sql) && !Const.onlySpaces(sql)) {
+        String[] sqlStatements = sql.split(";");
+        for (String statement : sqlStatements) {
+          data.out.write('s');
+          data.out.write(statement);
+          data.out.write(';');
+          data.out.newLine();
+        }
+      } else {
+        data.out.write('s');
+      }
+
       data.out.write(cmdBuff.toString());
       data.out.newLine();
 
@@ -553,7 +560,7 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
 
       // Allocate the buffer
       //
-      data.rowBuffer = new String[data.bufferSize]; // new 
byte[data.bufferSize][];
+      data.rowBuffer = new String[data.bufferSize];
       data.bufferIndex = 0;
 
       // Make sure our database connection settings are consistent with our 
dialog settings by
@@ -580,11 +587,6 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
   }
 
   @Override
-  public void dispose() {
-    super.dispose();
-  }
-
-  @Override
   public MonetDbBulkLoaderData getData() {
     return this.data;
   }
@@ -600,9 +602,7 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
     String password = Utils.resolvePassword(variables, 
Const.NVL(dm.getPassword(), ""));
     String db = resolve(Const.NVL(dm.getDatabaseName(), ""));
 
-    MapiSocket mserver =
-        getMonetDBConnection(hostname, Integer.parseInt(portnum), user, 
password, db);
-    return mserver;
+    return getMonetDBConnection(hostname, Integer.parseInt(portnum), user, 
password, db, log);
   }
 
   protected static MapiSocket getMonetDBConnection(
@@ -681,15 +681,13 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
       // marker that should not be omitted, likewise the
       // trailing semicolon
       out.write('s');
-      System.out.println(query);
       out.write(query);
       out.write(';');
       out.newLine();
 
       out.writeLine("");
 
-      String line = null;
-      while ((line = in.readLine()) != null) {
+      while (in.readLine() != null) {
         int type = in.getLineType();
 
         // read till we get back to the prompt
@@ -699,10 +697,8 @@ public class MonetDbBulkLoader extends 
BaseTransform<MonetDbBulkLoaderMeta, Mone
 
         switch (type) {
           case BufferedMCLReader.ERROR:
-            System.err.println(line);
             break;
           case BufferedMCLReader.RESULT:
-            System.out.println(line);
             break;
           default:
             // unknown, header, ...

Reply via email to