rkhachatryan commented on a change in pull request #15066:
URL: https://github.com/apache/flink/pull/15066#discussion_r590307472
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
##########
@@ -46,20 +50,29 @@
private static final int FORMAT_ID = 201;
- private transient byte[] gtridBuffer; // globalTransactionId = checkpoint
id (long)
- private transient byte[] bqualBuffer; // branchQualifier = task index +
random bytes
+ private transient byte[]
+ gtridBuffer; // globalTransactionId = job id + task index +
checkpoint id
+ private transient byte[] bqualBuffer; // branchQualifier = random bytes
@Override
public void open() {
- bqualBuffer = getRandomBytes(Long.BYTES);
- gtridBuffer = new byte[Long.BYTES];
+ gtridBuffer = new byte[3 * Long.BYTES];
+ bqualBuffer = getRandomBytes(Integer.BYTES);
}
@Override
public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
- writeNumber(runtimeContext.getIndexOfThisSubtask(), gtridBuffer, 0);
- // deliberately write only 4 bytes of checkpoint id and rely on random
generation
- writeNumber((int) checkpointId, gtridBuffer, Integer.BYTES);
+ Optional<JobID> jobId = runtimeContext.getJobId();
+ if (jobId.isPresent()) {
+ System.arraycopy(jobId.get().getBytes(), 0, gtridBuffer, 0, 16);
Review comment:
nit: how about using public `JobID.SIZE` instead of 16 here and in other
places (in declaration we can add 8 bytes)?
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
##########
@@ -40,21 +40,25 @@ public void testXidsUniqueAmongCheckpoints() {
}
@Test
- public void testXidsUniqueAmongGenerators() {
+ public void testXidsUniqueAmongJobs() {
long checkpointId = 1L;
+ SemanticXidGenerator generator = new SemanticXidGenerator();
checkUniqueness(
unused -> {
- SemanticXidGenerator generator = new
SemanticXidGenerator();
generator.open();
return generator.generateXid(TEST_RUNTIME_CONTEXT,
checkpointId);
});
}
private void checkUniqueness(Function<Integer, Xid> generate) {
Set<Xid> generated = new HashSet<>();
+ Set<byte[]> gtrids = new HashSet<>();
Review comment:
I'm afraid `byte[]` can't be used as a hashtable key because it's
equals/hashCode doesn't take array contents into accout.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]