This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 425baea  [FLINK-21044][connectors/jdbc] Use more random bytes in Xid
425baea is described below

commit 425baea4af4cdebbcb7e87d479cb04e62de26d33
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jan 20 07:19:35 2021 +0100

    [FLINK-21044][connectors/jdbc] Use more random bytes in Xid
    
    With only 4 random bytes, testXidsUniqueAmongGenerators fails
    once per ~100 runs as it deliberately uses same checkpointId
    and subtaskIdx.
    
    This change makes SemanticXidGenerator use 8 random bytes
    instead of 4 higher bytes of checkpointId.
---
 .../connector/jdbc/xa/SemanticXidGenerator.java    | 27 ++++++++--------------
 1 file changed, 9 insertions(+), 18 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
index 45c5d02..5db7ad6 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
@@ -28,9 +28,9 @@ import java.security.SecureRandom;
  * Generates {@link Xid} from:
  *
  * <ol>
- *   <li>checkpoint id
+ *   <li>checkpoint id (4 bytes)
  *   <li>subtask index
- *   <li>4 random bytes to provide uniqueness across other jobs and apps 
(generated at startup using
+ *   <li>8 random bytes to provide uniqueness across other jobs and apps 
(generated at startup using
  *       {@link SecureRandom})
  * </ol>
  *
@@ -45,38 +45,29 @@ class SemanticXidGenerator implements XidGenerator {
     private static final SecureRandom SECURE_RANDOM = new SecureRandom();
 
     private static final int FORMAT_ID = 201;
-    private static final int BQUAL_DYN_PART_LEN =
-            Integer.BYTES; // length of the branchQualifier dynamic part, 
which is task index
-    private static final int BQUAL_DYN_PART_POS = 0; // and it's starting 
position
 
     private transient byte[] gtridBuffer; // globalTransactionId = checkpoint 
id (long)
     private transient byte[] bqualBuffer; // branchQualifier = task index + 
random bytes
 
     @Override
     public void open() {
-        bqualBuffer = new byte[Long.BYTES];
-        byte[] bqualStaticPart = getRandomBytes(bqualBuffer.length - 
BQUAL_DYN_PART_LEN);
-        System.arraycopy(
-                bqualStaticPart, 0, bqualBuffer, BQUAL_DYN_PART_LEN, 
bqualStaticPart.length);
+        bqualBuffer = getRandomBytes(Long.BYTES);
         gtridBuffer = new byte[Long.BYTES];
     }
 
     @Override
     public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
-        writeNumber(
-                runtimeContext.getIndexOfThisSubtask(),
-                Integer.BYTES,
-                bqualBuffer,
-                BQUAL_DYN_PART_POS);
-        writeNumber(checkpointId, Long.BYTES, gtridBuffer, 0);
+        writeNumber(runtimeContext.getIndexOfThisSubtask(), gtridBuffer, 0);
+        // deliberately write only 4 bytes of checkpoint id and rely on random 
generation
+        writeNumber((int) checkpointId, gtridBuffer, Integer.BYTES);
         // relying on arrays copying inside XidImpl constructor
         return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
     }
 
-    private static void writeNumber(long number, int numberLength, byte[] dst, 
int dstPos) {
-        for (int i = dstPos; i < numberLength; i++) {
+    private static void writeNumber(int number, byte[] dst, int dstOffset) {
+        for (int i = dstOffset; i < dstOffset + Integer.BYTES; i++) {
             dst[i] = (byte) number;
-            number >>= 8;
+            number >>>= Byte.SIZE;
         }
     }
 

Reply via email to