This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hop.git
The following commit(s) were added to refs/heads/master by this push:
new 2279428 HOP-3163 - Kafka Consumer generate locks database transaction
new 6118025 Merge pull request #1029 from sramazzina/HOP-3163
2279428 is described below
commit 22794280c3ad2dba9b3c1cd192ddfede9cd449c6
Author: sergio.ramazzina <[email protected]>
AuthorDate: Mon Aug 30 18:04:55 2021 +0200
HOP-3163 - Kafka Consumer generate locks database transaction
---
.../hop/pipeline/transforms/update/Update.java | 36 ++++++++++++++++------
1 file changed, 27 insertions(+), 9 deletions(-)
diff --git
a/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
b/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
index b474adb..89dc2b8 100644
---
a/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
+++
b/plugins/transforms/update/src/main/java/org/apache/hop/pipeline/transforms/update/Update.java
@@ -523,30 +523,48 @@ public class Update extends BaseTransform<UpdateMeta,
UpdateData>
}
@Override
+ public void batchComplete() throws HopException {
+ // This is needed to properly handle transactions when we are using
SingleThreadedExecutor
+ commitBatch(false);
+ }
+
+ @Override
public void dispose() {
+ // This is needed to properly handle transactions when we are using
SingleThreadedExecutor
+ commitBatch(true);
+ super.dispose();
+ }
+ private void commitBatch(boolean dispose) {
if (data.db != null) {
try {
if (!data.db.isAutoCommit()) {
if (getErrors() == 0) {
- data.db.emptyAndCommit(data.prepStatementUpdate,
meta.useBatchUpdate());
+ if (dispose) {
+ data.db.emptyAndCommit(data.prepStatementUpdate,
meta.useBatchUpdate());
+ } else {
+ data.db.commit();
+ }
} else {
data.db.rollback();
}
}
- data.db.closePreparedStatement(data.prepStatementUpdate);
- data.db.closePreparedStatement(data.prepStatementLookup);
+ if (dispose) {
+ data.db.closePreparedStatement(data.prepStatementUpdate);
+ data.db.closePreparedStatement(data.prepStatementLookup);
+ }
} catch (HopDatabaseException e) {
logError(
- BaseMessages.getString(PKG,
"Update.Log.UnableToCommitUpdateConnection")
- + data.db
- + "] :"
- + e.toString());
+ BaseMessages.getString(PKG,
"Update.Log.UnableToCommitUpdateConnection")
+ + data.db
+ + "] :"
+ + e.toString());
setErrors(1);
} finally {
- data.db.disconnect();
+ if (dispose)
+ data.db.disconnect();
}
}
- super.dispose();
+
}
}