rkhachatryan commented on a change in pull request #15066:
URL: https://github.com/apache/flink/pull/15066#discussion_r590307472



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
##########
@@ -46,20 +50,29 @@
 
     private static final int FORMAT_ID = 201;
 
-    private transient byte[] gtridBuffer; // globalTransactionId = checkpoint 
id (long)
-    private transient byte[] bqualBuffer; // branchQualifier = task index + 
random bytes
+    private transient byte[]
+            gtridBuffer; // globalTransactionId = job id + task index + 
checkpoint id
+    private transient byte[] bqualBuffer; // branchQualifier = random bytes
 
     @Override
     public void open() {
-        bqualBuffer = getRandomBytes(Long.BYTES);
-        gtridBuffer = new byte[Long.BYTES];
+        gtridBuffer = new byte[3 * Long.BYTES];
+        bqualBuffer = getRandomBytes(Integer.BYTES);
     }
 
     @Override
     public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
-        writeNumber(runtimeContext.getIndexOfThisSubtask(), gtridBuffer, 0);
-        // deliberately write only 4 bytes of checkpoint id and rely on random 
generation
-        writeNumber((int) checkpointId, gtridBuffer, Integer.BYTES);
+        Optional<JobID> jobId = runtimeContext.getJobId();
+        if (jobId.isPresent()) {
+            System.arraycopy(jobId.get().getBytes(), 0, gtridBuffer, 0, 16);

Review comment:
       nit: how about using public `JobID.SIZE` instead of 16 here and in other 
places (in declaration we can add 8 bytes)?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
##########
@@ -40,21 +40,25 @@ public void testXidsUniqueAmongCheckpoints() {
     }
 
     @Test
-    public void testXidsUniqueAmongGenerators() {
+    public void testXidsUniqueAmongJobs() {
         long checkpointId = 1L;
+        SemanticXidGenerator generator = new SemanticXidGenerator();
         checkUniqueness(
                 unused -> {
-                    SemanticXidGenerator generator = new 
SemanticXidGenerator();
                     generator.open();
                     return generator.generateXid(TEST_RUNTIME_CONTEXT, 
checkpointId);
                 });
     }
 
     private void checkUniqueness(Function<Integer, Xid> generate) {
         Set<Xid> generated = new HashSet<>();
+        Set<byte[]> gtrids = new HashSet<>();

Review comment:
       I'm afraid `byte[]` can't be used as a hashtable key because it's 
equals/hashCode doesn't take array contents into accout.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to