This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 17cede9 KUDU-2612: Java client sets txnId in WriteRequestPB
17cede9 is described below
commit 17cede9da2c7213e05dcca921a0e767dda54f131
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Oct 23 20:04:13 2020 -0700
KUDU-2612: Java client sets txnId in WriteRequestPB
With this patch, WriteRequestPB messages generated by a transactional
AsyncKuduSession have their txnId field populated with corresponding
transaction identifier. For more information about the assumed changes
in the client API see e64eb7c7ceceec76aeb5cceac9dc42cc0e78f1ec.
It's assumed that follow-up changelists will add more comprehensive
end-to-end coverage once transactional API for Kudu client is introduced
and tablet servers process the WriteRequestPB:txn_id field as prescribed
by the design document [1].
[1] https://s.apache.org/kudu-multi-row-transaction-design
Change-Id: I2863ae97541c2124230b3af31acc75d42cd4c6df
Reviewed-on: http://gerrit.cloudera.org:8080/16644
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 1 +
.../org/apache/kudu/client/AsyncKuduSession.java | 22 +++++++++++++++++++++-
.../main/java/org/apache/kudu/client/Batch.java | 8 +++++++-
.../java/org/apache/kudu/client/Operation.java | 18 ++++++++++++++++++
4 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0d4d00c..bc01475 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -269,6 +269,7 @@ public class AsyncKuduClient implements AutoCloseable {
public static final int SLEEP_TIME = 500;
public static final byte[] EMPTY_ARRAY = new byte[0];
public static final long NO_TIMESTAMP = -1;
+ public static final long INVALID_TXN_ID = -1;
public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
public static final long DEFAULT_KEEP_ALIVE_PERIOD_MS = 15000; // 25% of the
default scanner ttl.
private static final long MAX_RPC_ATTEMPTS = 100;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 90eb526..d680b12 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -128,6 +128,7 @@ public class AsyncKuduSession implements
SessionConfiguration {
private FlushMode flushMode;
private ExternalConsistencyMode consistencyMode;
private long timeoutMillis;
+ private final long txnId;
/**
* Protects internal state from concurrent access. {@code AsyncKuduSession}
is not threadsafe
@@ -188,6 +189,24 @@ public class AsyncKuduSession implements
SessionConfiguration {
*/
AsyncKuduSession(AsyncKuduClient client) {
this.client = client;
+ this.txnId = AsyncKuduClient.INVALID_TXN_ID;
+ flushMode = FlushMode.AUTO_FLUSH_SYNC;
+ consistencyMode = CLIENT_PROPAGATED;
+ timeoutMillis = client.getDefaultOperationTimeoutMs();
+ inactiveBuffers.add(bufferA);
+ inactiveBuffers.add(bufferB);
+ errorCollector = new ErrorCollector(mutationBufferMaxOps);
+ }
+
+ /**
+ * Constructor for a transactional session.
+ * @param client client that creates this session
+ * @param txnId transaction identifier for all operations within the session
+ */
+ AsyncKuduSession(AsyncKuduClient client, long txnId) {
+ assert txnId >= 0;
+ this.client = client;
+ this.txnId = txnId;
flushMode = FlushMode.AUTO_FLUSH_SYNC;
consistencyMode = CLIENT_PROPAGATED;
timeoutMillis = client.getDefaultOperationTimeoutMs();
@@ -386,7 +405,7 @@ public class AsyncKuduSession implements
SessionConfiguration {
Batch batch = batches.get(tabletId);
if (batch == null) {
batch = new Batch(operation.getTable(), tablet,
ignoreAllDuplicateRows,
- ignoreAllNotFoundRows);
+ ignoreAllNotFoundRows, txnId);
batches.put(tabletId, batch);
}
batch.add(operation, currentIndex++);
@@ -525,6 +544,7 @@ public class AsyncKuduSession implements
SessionConfiguration {
operation.setExternalConsistencyMode(consistencyMode);
operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
operation.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);
+ operation.setTxnId(txnId);
return client.sendRpcToTablet(operation)
.addCallbackDeferring(resp -> {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 124a8ab..0c5c578 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -64,8 +64,10 @@ class Batch extends KuduRpc<BatchResponse> {
private final EnumSet<ErrorCode> ignoredErrors;
+ private final long txnId;
+
Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows,
- boolean ignoreAllNotFoundRows) {
+ boolean ignoreAllNotFoundRows, long txnId) {
super(table, null, 0);
// Build a set of ignored errors.
Set<ErrorCode> ignoredErrors = new HashSet<>();
@@ -82,6 +84,7 @@ class Batch extends KuduRpc<BatchResponse> {
this.ignoredErrors = EnumSet.copyOf(ignoredErrors);
}
this.tablet = tablet;
+ this.txnId = txnId;
}
/**
@@ -148,6 +151,9 @@ class Batch extends KuduRpc<BatchResponse> {
if (authzToken != null) {
builder.setAuthzToken(authzToken);
}
+ if (this.txnId != AsyncKuduClient.INVALID_TXN_ID) {
+ builder.setTxnId(this.txnId);
+ }
return builder.build();
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index ce59aa2..8cc9edb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -97,6 +97,11 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
boolean ignoreAllDuplicateRows = false;
/** See {@link SessionConfiguration#setIgnoreAllNotFoundRows(boolean)} */
boolean ignoreAllNotFoundRows = false;
+ /**
+ * Transaction identifier for the generated WriteRequestPB. Applicable only
+ * if set to a valid value.
+ */
+ long txnId = AsyncKuduClient.INVALID_TXN_ID;
/**
* Package-private constructor. Subclasses need to be instantiated via
AsyncKuduSession
@@ -135,6 +140,16 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
}
/**
+ * Set transaction identifier for this operation. If set, the transaction
+ * identifier is propagated into the generated WriteRequestPB.
+ *
+ * @param txnId transaction identifier to set
+ */
+ void setTxnId(long txnId) {
+ this.txnId = txnId;
+ }
+
+ /**
* Classes extending Operation need to have a specific ChangeType
* @return Operation's ChangeType
*/
@@ -186,6 +201,9 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
if (authzToken != null) {
builder.setAuthzToken(authzToken);
}
+ if (this.txnId != AsyncKuduClient.INVALID_TXN_ID) {
+ builder.setTxnId(this.txnId);
+ }
return builder.build();
}