This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2760cae73 [Bugfix][Connector-V2][maxcompute] sink commit with Block 
not exsits on server (#4725)
2760cae73 is described below

commit 2760cae73c3c45945378e93abcb703873a6453d9
Author: stdnt-xiao <[email protected]>
AuthorDate: Wed May 10 11:32:58 2023 +0800

    [Bugfix][Connector-V2][maxcompute] sink commit with Block not exsits on 
server (#4725)
---
 .../seatunnel/maxcompute/sink/MaxcomputeWriter.java    | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index c3d157fbf..ba042444d 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -42,9 +42,10 @@ import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
 
 @Slf4j
 public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
-    private final RecordWriter recordWriter;
+    private RecordWriter recordWriter;
     private final TableTunnel.UploadSession session;
     private final TableSchema tableSchema;
+    private static final Long BLOCK_0 = 0L;
 
     public MaxcomputeWriter(Config pluginConfig) {
         try {
@@ -65,7 +66,7 @@ public class MaxcomputeWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                                 pluginConfig.getString(PROJECT.key()),
                                 pluginConfig.getString(TABLE_NAME.key()));
             }
-            this.recordWriter = 
session.openRecordWriter(Thread.currentThread().getId());
+            this.recordWriter = session.openRecordWriter(BLOCK_0);
             log.info("open record writer success");
         } catch (Exception e) {
             throw new 
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
@@ -80,11 +81,14 @@ public class MaxcomputeWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     @Override
     public void close() throws IOException {
-        this.recordWriter.close();
-        try {
-            this.session.commit(new Long[] {Thread.currentThread().getId()});
-        } catch (Exception e) {
-            throw new 
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+        if (recordWriter != null) {
+            recordWriter.close();
+            try {
+                session.commit(new Long[] {BLOCK_0});
+            } catch (Exception e) {
+                throw new 
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+            }
+            recordWriter = null;
         }
     }
 }

Reply via email to