This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f351ceaf4 [Hotfix][connector-v2] fix SemanticXidGenerator#generateXid
indexOutOfBounds #3701 (#3705)
f351ceaf4 is described below
commit f351ceaf4ba7276882ec4e9fb73f3cc1874ff1f0
Author: ic4y <[email protected]>
AuthorDate: Tue Dec 13 10:21:40 2022 +0800
[Hotfix][connector-v2] fix SemanticXidGenerator#generateXid
indexOutOfBounds #3701 (#3705)
---
.../jdbc/internal/xa/SemanticXidGenerator.java | 20 +++++++++++++-------
.../jdbc/internal/xa/SemanticXidGeneratorTest.java | 11 ++++++++---
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 4f51c31a5..df26d82b1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -33,9 +33,9 @@ import java.util.Arrays;
* <ol>
* <li>To provide uniqueness over other jobs and apps, and other instances
* <li>of this job, gtrid consists of
- * <li>job id (16 bytes)
+ * <li>job id (32 bytes)
* <li>subtask index (4 bytes)
- * <li>checkpoint id (4 bytes)
+ * <li>checkpoint id (8 bytes)
* <li>bqual consists of 4 random bytes (generated using {@link
SecureRandom})
* </ol>
*
@@ -65,8 +65,9 @@ class SemanticXidGenerator
@Override
public Xid generateXid(JobContext context, SinkWriter.Context sinkContext,
long checkpointId) {
byte[] jobIdBytes = context.getJobId().getBytes();
+ Arrays.fill(gtridBuffer, (byte) 0);
checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
- System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
+ System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, jobIdBytes.length);
writeNumber(sinkContext.getIndexOfSubtask(), Integer.BYTES,
gtridBuffer, JOB_ID_BYTES);
writeNumber(checkpointId, Long.BYTES, gtridBuffer, JOB_ID_BYTES +
Integer.BYTES);
@@ -79,13 +80,18 @@ class SemanticXidGenerator
if (xid.getFormatId() != FORMAT_ID) {
return false;
}
- int subtaskIndex = readNumber(xid.getGlobalTransactionId(),
JOB_ID_BYTES, Integer.BYTES);
- if (subtaskIndex != sinkContext.getIndexOfSubtask()) {
+ int xidSubtaskIndex = readNumber(xid.getGlobalTransactionId(),
JOB_ID_BYTES, Integer.BYTES);
+ if (xidSubtaskIndex != sinkContext.getIndexOfSubtask()) {
return false;
}
+ byte[] xidJobIdBytes = new byte[JOB_ID_BYTES];
+ System.arraycopy(xid.getGlobalTransactionId(), 0, xidJobIdBytes, 0,
JOB_ID_BYTES);
+
byte[] jobIdBytes = new byte[JOB_ID_BYTES];
- System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0,
JOB_ID_BYTES);
- return Arrays.equals(jobIdBytes, context.getJobId().getBytes());
+ byte[] bytes = context.getJobId().getBytes();
+ System.arraycopy(bytes, 0, jobIdBytes, 0, bytes.length);
+
+ return Arrays.equals(jobIdBytes, xidJobIdBytes);
}
private static int readNumber(byte[] bytes, int offset, int numBytes) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
index 906c3bd33..5ff57eac1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
@@ -28,19 +28,24 @@ import org.junit.jupiter.api.Test;
import javax.transaction.xa.Xid;
class SemanticXidGeneratorTest {
- private JobContext jobContext;
private SemanticXidGenerator xidGenerator;
@BeforeEach
void before() {
- jobContext = new JobContext();
xidGenerator = new SemanticXidGenerator();
xidGenerator.open();
}
@Test
void testBelongsToSubtask() {
- DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(1);
+ JobContext uuidJobContext = new JobContext();
+ check(uuidJobContext);
+ JobContext longJobContext = new JobContext(Long.MIN_VALUE);
+ check(longJobContext);
+ }
+
+ void check(JobContext jobContext){
+ DefaultSinkWriterContext dc1 = new
DefaultSinkWriterContext(Integer.MAX_VALUE);
Xid xid1 = xidGenerator.generateXid(jobContext, dc1,
System.currentTimeMillis());
Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext,
dc1));
Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, jobContext,
new DefaultSinkWriterContext(2)));