This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 425baea [FLINK-21044][connectors/jdbc] Use more random bytes in Xid
425baea is described below
commit 425baea4af4cdebbcb7e87d479cb04e62de26d33
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jan 20 07:19:35 2021 +0100
[FLINK-21044][connectors/jdbc] Use more random bytes in Xid
With only 4 random bytes, testXidsUniqueAmongGenerators fails
once per ~100 runs as it deliberately uses same checkpointId
and subtaskIdx.
This change makes SemanticXidGenerator use 8 random bytes
instead of 4 higher bytes of checkpointId.
---
.../connector/jdbc/xa/SemanticXidGenerator.java | 27 ++++++++--------------
1 file changed, 9 insertions(+), 18 deletions(-)
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
index 45c5d02..5db7ad6 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
@@ -28,9 +28,9 @@ import java.security.SecureRandom;
* Generates {@link Xid} from:
*
* <ol>
- * <li>checkpoint id
+ * <li>checkpoint id (4 bytes)
* <li>subtask index
- * <li>4 random bytes to provide uniqueness across other jobs and apps
(generated at startup using
+ * <li>8 random bytes to provide uniqueness across other jobs and apps
(generated at startup using
* {@link SecureRandom})
* </ol>
*
@@ -45,38 +45,29 @@ class SemanticXidGenerator implements XidGenerator {
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
private static final int FORMAT_ID = 201;
- private static final int BQUAL_DYN_PART_LEN =
- Integer.BYTES; // length of the branchQualifier dynamic part,
which is task index
- private static final int BQUAL_DYN_PART_POS = 0; // and it's starting
position
private transient byte[] gtridBuffer; // globalTransactionId = checkpoint
id (long)
private transient byte[] bqualBuffer; // branchQualifier = task index +
random bytes
@Override
public void open() {
- bqualBuffer = new byte[Long.BYTES];
- byte[] bqualStaticPart = getRandomBytes(bqualBuffer.length -
BQUAL_DYN_PART_LEN);
- System.arraycopy(
- bqualStaticPart, 0, bqualBuffer, BQUAL_DYN_PART_LEN,
bqualStaticPart.length);
+ bqualBuffer = getRandomBytes(Long.BYTES);
gtridBuffer = new byte[Long.BYTES];
}
@Override
public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
- writeNumber(
- runtimeContext.getIndexOfThisSubtask(),
- Integer.BYTES,
- bqualBuffer,
- BQUAL_DYN_PART_POS);
- writeNumber(checkpointId, Long.BYTES, gtridBuffer, 0);
+ writeNumber(runtimeContext.getIndexOfThisSubtask(), gtridBuffer, 0);
+ // deliberately write only 4 bytes of checkpoint id and rely on random
generation
+ writeNumber((int) checkpointId, gtridBuffer, Integer.BYTES);
// relying on arrays copying inside XidImpl constructor
return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
}
- private static void writeNumber(long number, int numberLength, byte[] dst,
int dstPos) {
- for (int i = dstPos; i < numberLength; i++) {
+ private static void writeNumber(int number, byte[] dst, int dstOffset) {
+ for (int i = dstOffset; i < dstOffset + Integer.BYTES; i++) {
dst[i] = (byte) number;
- number >>= 8;
+ number >>>= Byte.SIZE;
}
}