This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hop.git


The following commit(s) were added to refs/heads/master by this push:
     new 2279428  HOP-3163 - Kafka Consumer generate locks database transaction
     new 6118025  Merge pull request #1029 from sramazzina/HOP-3163
2279428 is described below

commit 22794280c3ad2dba9b3c1cd192ddfede9cd449c6
Author: sergio.ramazzina <[email protected]>
AuthorDate: Mon Aug 30 18:04:55 2021 +0200

    HOP-3163 - Kafka Consumer generate locks database transaction
---
 .../hop/pipeline/transforms/update/Update.java     | 36 ++++++++++++++++------
 1 file changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
 
b/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
index b474adb..89dc2b8 100644
--- 
a/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
+++ 
b/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
@@ -523,30 +523,48 @@ public class Update extends BaseTransform<UpdateMeta, 
UpdateData>
   }
 
   @Override
+  public void batchComplete() throws HopException {
+    // This is needed to properly handle transactions when we are using 
SingleThreadedExecutor
+    commitBatch(false);
+  }
+
+  @Override
   public void dispose() {
+    // This is needed to properly handle transactions when we are using 
SingleThreadedExecutor
+    commitBatch(true);
+    super.dispose();
+  }
 
+  private void commitBatch(boolean dispose) {
     if (data.db != null) {
       try {
         if (!data.db.isAutoCommit()) {
           if (getErrors() == 0) {
-            data.db.emptyAndCommit(data.prepStatementUpdate, 
meta.useBatchUpdate());
+            if (dispose) {
+              data.db.emptyAndCommit(data.prepStatementUpdate, 
meta.useBatchUpdate());
+            } else {
+              data.db.commit();
+            }
           } else {
             data.db.rollback();
           }
         }
-        data.db.closePreparedStatement(data.prepStatementUpdate);
-        data.db.closePreparedStatement(data.prepStatementLookup);
+        if (dispose) {
+          data.db.closePreparedStatement(data.prepStatementUpdate);
+          data.db.closePreparedStatement(data.prepStatementLookup);
+        }
       } catch (HopDatabaseException e) {
         logError(
-            BaseMessages.getString(PKG, 
"Update.Log.UnableToCommitUpdateConnection")
-                + data.db
-                + "] :"
-                + e.toString());
+                BaseMessages.getString(PKG, 
"Update.Log.UnableToCommitUpdateConnection")
+                        + data.db
+                        + "] :"
+                        + e.toString());
         setErrors(1);
       } finally {
-        data.db.disconnect();
+        if (dispose)
+          data.db.disconnect();
       }
     }
-    super.dispose();
+
   }
 }

Reply via email to