This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2760cae73 [Bugfix][Connector-V2][maxcompute] sink commit with Block
not exsits on server (#4725)
2760cae73 is described below
commit 2760cae73c3c45945378e93abcb703873a6453d9
Author: stdnt-xiao <[email protected]>
AuthorDate: Wed May 10 11:32:58 2023 +0800
[Bugfix][Connector-V2][maxcompute] sink commit with Block not exsits on
server (#4725)
---
.../seatunnel/maxcompute/sink/MaxcomputeWriter.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index c3d157fbf..ba042444d 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -42,9 +42,10 @@ import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
@Slf4j
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private final RecordWriter recordWriter;
+ private RecordWriter recordWriter;
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;
+ private static final Long BLOCK_0 = 0L;
public MaxcomputeWriter(Config pluginConfig) {
try {
@@ -65,7 +66,7 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()));
}
- this.recordWriter =
session.openRecordWriter(Thread.currentThread().getId());
+ this.recordWriter = session.openRecordWriter(BLOCK_0);
log.info("open record writer success");
} catch (Exception e) {
throw new
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
@@ -80,11 +81,14 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
@Override
public void close() throws IOException {
- this.recordWriter.close();
- try {
- this.session.commit(new Long[] {Thread.currentThread().getId()});
- } catch (Exception e) {
- throw new
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+ if (recordWriter != null) {
+ recordWriter.close();
+ try {
+ session.commit(new Long[] {BLOCK_0});
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+ }
+ recordWriter = null;
}
}
}