RocMarshal commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1529646574


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int 
numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, 
false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long 
checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, 
checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, 
boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not 
valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new 
DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new 
DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws 
IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, 
restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, 
restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 
1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        Preconditions.checkArgument(checkpointId >= 0, "No branch was 
initialized");
+        try {
+            // branchQualifier = numberOfSubtasks + checkpoint id
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    public byte[] serialize() {
+        try {
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            out.writeInt(attempts);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   ditto



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int 
numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, 
false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long 
checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, 
checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, 
boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not 
valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new 
DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new 
DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws 
IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, 
restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, 
restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 
1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        Preconditions.checkArgument(checkpointId >= 0, "No branch was 
initialized");
+        try {
+            // branchQualifier = numberOfSubtasks + checkpoint id
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.writeInt(numberOfSubtasks);
+            out.writeLong(checkpointId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   ditto



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,273 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+@Internal
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int 
numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, 
false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long 
checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, 
checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, 
boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not 
valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new 
DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new 
DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private static byte[] readJobId(DataInputDeserializer in) throws 
IOException {
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        in.read(jobIdBytes);
+        return jobIdBytes;
+    }
+
+    public TransactionId copy() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, 
restored);
+    }
+
+    public TransactionId withBranch(long checkpointId) {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, 
restored);
+    }
+
+    public TransactionId withAttemptsIncremented() {
+        return new TransactionId(
+                jobId, subtaskId, numberOfSubtasks, checkpointId, attempts + 
1, restored);
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public boolean getRestored() {
+        return restored;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public String getXidValue() {
+        return String.format(
+                "%s:%s:%s",
+                getFormatId(),
+                byteToHexString(getGlobalTransactionId()),
+                byteToHexString(getBranchQualifier()));
+    }
+
+    @Override
+    public int getFormatId() {
+        return FORMAT_ID;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        try {
+            // globalTransactionId = job id + task index
+            final DataOutputSerializer out = new DataOutputSerializer(1);
+            out.write(jobId, 0, JobID.SIZE);
+            out.writeInt(subtaskId);
+            return out.getSharedBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   just a little puzzle.
   Why not use `throw new FlinkRuntimeException(e);` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to