This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 17cede9  KUDU-2612: Java client sets txnId in WriteRequestPB
17cede9 is described below

commit 17cede9da2c7213e05dcca921a0e767dda54f131
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Oct 23 20:04:13 2020 -0700

    KUDU-2612: Java client sets txnId in WriteRequestPB
    
    With this patch, WriteRequestPB messages generated by a transactional
    AsyncKuduSession have their txnId field populated with corresponding
    transaction identifier.  For more information about the assumed changes
    in the client API see e64eb7c7ceceec76aeb5cceac9dc42cc0e78f1ec.
    
    It's assumed that follow-up changelists will add more comprehensive
    end-to-end coverage once transactional API for Kudu client is introduced
    and tablet servers process the WriteRequestPB:txn_id field as prescribed
    by the design document [1].
    
    [1] https://s.apache.org/kudu-multi-row-transaction-design
    
    Change-Id: I2863ae97541c2124230b3af31acc75d42cd4c6df
    Reviewed-on: http://gerrit.cloudera.org:8080/16644
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  1 +
 .../org/apache/kudu/client/AsyncKuduSession.java   | 22 +++++++++++++++++++++-
 .../main/java/org/apache/kudu/client/Batch.java    |  8 +++++++-
 .../java/org/apache/kudu/client/Operation.java     | 18 ++++++++++++++++++
 4 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0d4d00c..bc01475 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -269,6 +269,7 @@ public class AsyncKuduClient implements AutoCloseable {
   public static final int SLEEP_TIME = 500;
   public static final byte[] EMPTY_ARRAY = new byte[0];
   public static final long NO_TIMESTAMP = -1;
+  public static final long INVALID_TXN_ID = -1;
   public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
   public static final long DEFAULT_KEEP_ALIVE_PERIOD_MS = 15000; // 25% of the 
default scanner ttl.
   private static final long MAX_RPC_ATTEMPTS = 100;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 90eb526..d680b12 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -128,6 +128,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
   private FlushMode flushMode;
   private ExternalConsistencyMode consistencyMode;
   private long timeoutMillis;
+  private final long txnId;
 
   /**
    * Protects internal state from concurrent access. {@code AsyncKuduSession} 
is not threadsafe
@@ -188,6 +189,24 @@ public class AsyncKuduSession implements 
SessionConfiguration {
    */
   AsyncKuduSession(AsyncKuduClient client) {
     this.client = client;
+    this.txnId = AsyncKuduClient.INVALID_TXN_ID;
+    flushMode = FlushMode.AUTO_FLUSH_SYNC;
+    consistencyMode = CLIENT_PROPAGATED;
+    timeoutMillis = client.getDefaultOperationTimeoutMs();
+    inactiveBuffers.add(bufferA);
+    inactiveBuffers.add(bufferB);
+    errorCollector = new ErrorCollector(mutationBufferMaxOps);
+  }
+
+  /**
+   * Constructor for a transactional session.
+   * @param client client that creates this session
+   * @param txnId transaction identifier for all operations within the session
+   */
+  AsyncKuduSession(AsyncKuduClient client, long txnId) {
+    assert txnId >= 0;
+    this.client = client;
+    this.txnId = txnId;
     flushMode = FlushMode.AUTO_FLUSH_SYNC;
     consistencyMode = CLIENT_PROPAGATED;
     timeoutMillis = client.getDefaultOperationTimeoutMs();
@@ -386,7 +405,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
         Batch batch = batches.get(tabletId);
         if (batch == null) {
           batch = new Batch(operation.getTable(), tablet, 
ignoreAllDuplicateRows,
-              ignoreAllNotFoundRows);
+              ignoreAllNotFoundRows, txnId);
           batches.put(tabletId, batch);
         }
         batch.add(operation, currentIndex++);
@@ -525,6 +544,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     operation.setExternalConsistencyMode(consistencyMode);
     operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
     operation.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);
+    operation.setTxnId(txnId);
 
     return client.sendRpcToTablet(operation)
         .addCallbackDeferring(resp -> {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 124a8ab..0c5c578 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -64,8 +64,10 @@ class Batch extends KuduRpc<BatchResponse> {
 
   private final EnumSet<ErrorCode> ignoredErrors;
 
+  private final long txnId;
+
   Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows,
-        boolean ignoreAllNotFoundRows) {
+        boolean ignoreAllNotFoundRows, long txnId) {
     super(table, null, 0);
     // Build a set of ignored errors.
     Set<ErrorCode> ignoredErrors = new HashSet<>();
@@ -82,6 +84,7 @@ class Batch extends KuduRpc<BatchResponse> {
       this.ignoredErrors = EnumSet.copyOf(ignoredErrors);
     }
     this.tablet = tablet;
+    this.txnId = txnId;
   }
 
   /**
@@ -148,6 +151,9 @@ class Batch extends KuduRpc<BatchResponse> {
     if (authzToken != null) {
       builder.setAuthzToken(authzToken);
     }
+    if (this.txnId != AsyncKuduClient.INVALID_TXN_ID) {
+      builder.setTxnId(this.txnId);
+    }
     return builder.build();
   }
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index ce59aa2..8cc9edb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -97,6 +97,11 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
   boolean ignoreAllDuplicateRows = false;
   /** See {@link SessionConfiguration#setIgnoreAllNotFoundRows(boolean)} */
   boolean ignoreAllNotFoundRows = false;
+  /**
+   * Transaction identifier for the generated WriteRequestPB. Applicable only
+   * if set to a valid value.
+   */
+  long txnId = AsyncKuduClient.INVALID_TXN_ID;
 
   /**
    * Package-private constructor. Subclasses need to be instantiated via 
AsyncKuduSession
@@ -135,6 +140,16 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
   }
 
   /**
+   * Set transaction identifier for this operation. If set, the transaction
+   * identifier is propagated into the generated WriteRequestPB.
+   *
+   * @param txnId transaction identifier to set
+   */
+  void setTxnId(long txnId) {
+    this.txnId = txnId;
+  }
+
+  /**
    * Classes extending Operation need to have a specific ChangeType
    * @return Operation's ChangeType
    */
@@ -186,6 +201,9 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
     if (authzToken != null) {
       builder.setAuthzToken(authzToken);
     }
+    if (this.txnId != AsyncKuduClient.INVALID_TXN_ID) {
+      builder.setTxnId(this.txnId);
+    }
     return builder.build();
   }
 

Reply via email to