This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7432a7a8aa0c98f8d2177c25b99576b51ff33a93
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Dec 7 23:44:21 2020 -0800

    KUDU-2612 Java client transaction implementation
    
    This patch implements the functionality behind the transaction-related
    API calls in the Java Kudu client (i.e. issuing RPC calls to TxnManager,
    retrying in case of transient errors, etc.).  Corresponding tests have
    been added as well.  Some of the newly introduced tests are disabled
    and should be re-enabled once the transaction commit orchestration is
    implemented.
    
    Change-Id: Ie0236875e7264877c3f7a13da4a5a3da6423786b
    Reviewed-on: http://gerrit.cloudera.org:8080/16929
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Hao Hao <[email protected]>
---
 .../kudu/client/AbortTransactionRequest.java       |  81 ++++
 .../kudu/client/AbortTransactionResponse.java      |  32 ++
 .../org/apache/kudu/client/AsyncKuduClient.java    |  12 +-
 .../kudu/client/BeginTransactionRequest.java       |  81 ++++
 .../kudu/client/BeginTransactionResponse.java      |  56 +++
 .../kudu/client/CommitTransactionRequest.java      |  82 ++++
 .../kudu/client/CommitTransactionResponse.java     |  32 ++
 .../kudu/client/GetTransactionStateRequest.java    |  85 ++++
 .../kudu/client/GetTransactionStateResponse.java   |  50 +++
 .../main/java/org/apache/kudu/client/KuduRpc.java  |   3 +-
 .../org/apache/kudu/client/KuduTransaction.java    | 164 +++++++-
 .../main/java/org/apache/kudu/client/RpcProxy.java |  53 ++-
 .../main/java/org/apache/kudu/client/Status.java   |  11 +
 .../apache/kudu/client/TestKuduTransaction.java    | 457 +++++++++++++++++++++
 src/kudu/transactions/txn_status_manager.cc        |  17 +
 15 files changed, 1200 insertions(+), 16 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbortTransactionRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbortTransactionRequest.java
new file mode 100644
index 0000000..ded18ab
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbortTransactionRequest.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static 
org.apache.kudu.transactions.TxnManager.AbortTransactionResponsePB;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Message;
+import io.netty.util.Timer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.kudu.transactions.TxnManager;
+import org.apache.kudu.util.Pair;
+
+/**
+ * A wrapper class for kudu.transactions.TxnManagerService.AbortTransaction 
RPC.
+ */
[email protected]
+class AbortTransactionRequest extends KuduRpc<AbortTransactionResponse> {
+  private static final List<Integer> featureFlags = ImmutableList.of();
+  final long txnId;
+
+  AbortTransactionRequest(
+      KuduTable masterTable, Timer timer, long timeoutMillis, long txnId) {
+    super(masterTable, timer, timeoutMillis);
+    Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
+    this.txnId = txnId;
+  }
+
+  @Override
+  Message createRequestPB() {
+    final TxnManager.AbortTransactionRequestPB.Builder b =
+        TxnManager.AbortTransactionRequestPB.newBuilder();
+    b.setTxnId(txnId);
+    return b.build();
+  }
+
+  @Override
+  String serviceName() {
+    return TXN_MANAGER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return "AbortTransaction";
+  }
+
+  @Override
+  Pair<AbortTransactionResponse, Object> deserialize(
+      final CallResponse callResponse, String serverUUID) throws KuduException 
{
+    final AbortTransactionResponsePB.Builder b = 
AbortTransactionResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), b);
+    AbortTransactionResponse response = new AbortTransactionResponse(
+        timeoutTracker.getElapsedMillis(), serverUUID);
+    return new Pair<>(response, b.hasError() ? b.getError() : null);
+  }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return featureFlags;
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbortTransactionResponse.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbortTransactionResponse.java
new file mode 100644
index 0000000..3fd7c7a
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbortTransactionResponse.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class AbortTransactionResponse extends KuduRpcResponse {
+  /**
+   * @param elapsedMillis time in milliseconds since RPC creation to now
+   * @param serverUUID UUID of the server that sent the response
+   */
+  AbortTransactionResponse(long elapsedMillis, String serverUUID) {
+    super(elapsedMillis, serverUUID);
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 6a9ab5a..5f2822f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1448,7 +1448,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param timeoutMs the timeout in milliseconds for the fake RPC
    * @return created fake RPC
    */
-  private <R> KuduRpc<R> buildFakeRpc(
+  <R> KuduRpc<R> buildFakeRpc(
       @Nonnull final String method,
       @Nullable final KuduRpc<?> parent,
       long timeoutMs) {
@@ -1465,7 +1465,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param <R> the expected return type of the fake RPC
    * @return created fake RPC
    */
-  private <R> KuduRpc<R> buildFakeRpc(
+  <R> KuduRpc<R> buildFakeRpc(
       @Nonnull final String method,
       @Nullable final KuduRpc<?> parent) {
     return buildFakeRpc(method, parent, defaultAdminOperationTimeoutMs);
@@ -1474,7 +1474,7 @@ public class AsyncKuduClient implements AutoCloseable {
   /**
    * A fake RPC that is used for timeouts and will never be sent.
    */
-  private static class FakeKuduRpc<R> extends KuduRpc<R> {
+  static class FakeKuduRpc<R> extends KuduRpc<R> {
     private final String method;
 
     FakeKuduRpc(String method, Timer timer, long timeoutMillis) {
@@ -1713,7 +1713,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
-  private long getSleepTimeForRpcMillis(KuduRpc<?> rpc) {
+  long getSleepTimeForRpcMillis(KuduRpc<?> rpc) {
     int attemptCount = rpc.attempt;
     if (attemptCount == 0) {
       // If this is the first RPC attempt, don't sleep at all.
@@ -1758,8 +1758,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param cause What was cause of the last failed attempt, if known.
    * You can pass {@code null} if the cause is unknown.
    */
-  private static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> 
request,
-                                                          final KuduException 
cause) {
+  static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
+                                                  final KuduException cause) {
     String message;
     if (request.attempt > MAX_RPC_ATTEMPTS) {
       message = "too many attempts: ";
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
new file mode 100644
index 0000000..eccddb8
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static 
org.apache.kudu.transactions.TxnManager.BeginTransactionResponsePB;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Message;
+import io.netty.util.Timer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.kudu.transactions.TxnManager;
+import org.apache.kudu.util.Pair;
+
+/**
+ * A wrapper class for kudu.transactions.TxnManagerService.BeginTransaction 
RPC.
+ */
[email protected]
+class BeginTransactionRequest extends KuduRpc<BeginTransactionResponse> {
+  private static final List<Integer> featureFlags = ImmutableList.of();
+
+  BeginTransactionRequest(KuduTable fakeTable, Timer timer, long 
timeoutMillis) {
+    super(fakeTable, timer, timeoutMillis);
+  }
+
+  @Override
+  Message createRequestPB() {
+    return TxnManager.BeginTransactionRequestPB.getDefaultInstance();
+  }
+
+  @Override
+  String serviceName() {
+    return TXN_MANAGER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return "BeginTransaction";
+  }
+
+  @Override
+  Pair<BeginTransactionResponse, Object> deserialize(
+      final CallResponse callResponse, String serverUUID) throws KuduException 
{
+    final BeginTransactionResponsePB.Builder b = 
BeginTransactionResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), b);
+    if (!b.hasError()) {
+      Preconditions.checkState(b.hasTxnId());
+      Preconditions.checkState(b.hasKeepaliveMillis());
+    }
+    BeginTransactionResponse response = new BeginTransactionResponse(
+        timeoutTracker.getElapsedMillis(),
+        serverUUID,
+        b.getTxnId(),
+        b.getKeepaliveMillis());
+    return new Pair<>(response, b.hasError() ? b.getError() : null);
+  }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return featureFlags;
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionResponse.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionResponse.java
new file mode 100644
index 0000000..03048be
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionResponse.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class BeginTransactionResponse extends KuduRpcResponse {
+  private final long txnId;
+  private final int keepaliveMillis;
+
+  /**
+   * @param elapsedMillis time in milliseconds since RPC creation to now
+   * @param serverUUID UUID of the server that sent the response
+   * @param txnId identifier of the new transaction
+   * @param keepaliveMillis keepalive interval for the newly started 
transaction
+   */
+  BeginTransactionResponse(
+      long elapsedMillis, String serverUUID, long txnId, int keepaliveMillis) {
+    super(elapsedMillis, serverUUID);
+    Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
+    Preconditions.checkArgument(keepaliveMillis >= 0);
+    this.txnId = txnId;
+    this.keepaliveMillis = keepaliveMillis;
+  }
+
+  /**
+   * @return the identifier of the started transaction
+   */
+  public long txnId() {
+    return txnId;
+  }
+
+  /**
+   * @return the keepalive interval for the started transaction (milliseconds)
+   */
+  public int keepaliveMillis() {
+    return keepaliveMillis;
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CommitTransactionRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CommitTransactionRequest.java
new file mode 100644
index 0000000..0cffde9
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CommitTransactionRequest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static 
org.apache.kudu.transactions.TxnManager.CommitTransactionResponsePB;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Message;
+import io.netty.util.Timer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.kudu.transactions.TxnManager;
+import org.apache.kudu.util.Pair;
+
+/**
+ * A wrapper class for kudu.transactions.TxnManagerService.CommitTransaction 
RPC.
+ */
[email protected]
+class CommitTransactionRequest extends KuduRpc<CommitTransactionResponse> {
+  private static final List<Integer> featureFlags = ImmutableList.of();
+  final long txnId;
+
+  CommitTransactionRequest(
+      KuduTable masterTable, Timer timer, long timeoutMillis, long txnId) {
+    super(masterTable, timer, timeoutMillis);
+    Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
+    this.txnId = txnId;
+  }
+
+  @Override
+  Message createRequestPB() {
+    final TxnManager.CommitTransactionRequestPB.Builder b =
+        TxnManager.CommitTransactionRequestPB.newBuilder();
+    b.setTxnId(txnId);
+    return b.build();
+  }
+
+  @Override
+  String serviceName() {
+    return TXN_MANAGER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return "CommitTransaction";
+  }
+
+  @Override
+  Pair<CommitTransactionResponse, Object> deserialize(
+      final CallResponse callResponse, String serverUUID) throws KuduException 
{
+    final CommitTransactionResponsePB.Builder b =
+        CommitTransactionResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), b);
+    CommitTransactionResponse response = new CommitTransactionResponse(
+        timeoutTracker.getElapsedMillis(), serverUUID);
+    return new Pair<>(response, b.hasError() ? b.getError() : null);
+  }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return featureFlags;
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CommitTransactionResponse.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CommitTransactionResponse.java
new file mode 100644
index 0000000..297b0f1
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CommitTransactionResponse.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class CommitTransactionResponse extends KuduRpcResponse {
+  /**
+   * @param elapsedMillis time in milliseconds since RPC creation to now
+   * @param serverUUID UUID of the server that sent the response
+   */
+  CommitTransactionResponse(long elapsedMillis, String serverUUID) {
+    super(elapsedMillis, serverUUID);
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
new file mode 100644
index 0000000..be606cd
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static 
org.apache.kudu.transactions.TxnManager.GetTransactionStateResponsePB;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Message;
+import io.netty.util.Timer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.kudu.transactions.TxnManager;
+import org.apache.kudu.util.Pair;
+
+/**
+ * A wrapper class for kudu.transactions.TxnManagerService.GetTransactionState 
RPC.
+ */
[email protected]
+class GetTransactionStateRequest extends KuduRpc<GetTransactionStateResponse> {
+  private static final List<Integer> featureFlags = ImmutableList.of();
+  final long txnId;
+
+  GetTransactionStateRequest(
+      KuduTable masterTable, Timer timer, long timeoutMillis, long txnId) {
+    super(masterTable, timer, timeoutMillis);
+    Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
+    this.txnId = txnId;
+  }
+
+  @Override
+  Message createRequestPB() {
+    final TxnManager.GetTransactionStateRequestPB.Builder b =
+        TxnManager.GetTransactionStateRequestPB.newBuilder();
+    b.setTxnId(txnId);
+    return b.build();
+  }
+
+  @Override
+  String serviceName() {
+    return TXN_MANAGER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return "GetTransactionState";
+  }
+
+  @Override
+  Pair<GetTransactionStateResponse, Object> deserialize(
+      final CallResponse callResponse, String serverUUID) throws KuduException 
{
+    final GetTransactionStateResponsePB.Builder b =
+        GetTransactionStateResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), b);
+    if (!b.hasError()) {
+      Preconditions.checkState(b.hasState());
+    }
+    GetTransactionStateResponse response = new GetTransactionStateResponse(
+        timeoutTracker.getElapsedMillis(), serverUUID, b.getState());
+    return new Pair<>(response, b.hasError() ? b.getError() : null);
+  }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return featureFlags;
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
new file mode 100644
index 0000000..0dc576a
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.kudu.transactions.Transactions;
+
[email protected]
+public class GetTransactionStateResponse extends KuduRpcResponse {
+  private final Transactions.TxnStatePB txnState;
+
+  /**
+   * @param elapsedMillis time in milliseconds since RPC creation to now
+   * @param serverUUID UUID of the server that sent the response
+   * @param txnState the state of the transaction
+   */
+  GetTransactionStateResponse(
+      long elapsedMillis, String serverUUID, Transactions.TxnStatePB txnState) 
{
+    super(elapsedMillis, serverUUID);
+    this.txnState = txnState;
+  }
+
+  public Transactions.TxnStatePB txnState() {
+    return txnState;
+  }
+
+  public boolean isCommitted() {
+    return txnState == Transactions.TxnStatePB.COMMITTED;
+  }
+
+  public boolean isAborted() {
+    return txnState == Transactions.TxnStatePB.ABORTED;
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index a224c65..095a9a0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -95,9 +95,10 @@ public abstract class KuduRpc<R> {
    */
   static final int MAX_RPC_SIZE = 256 * 1024 * 1024; // 256MB
 
-  // Service names.
+  // Service names used by the client.
   protected static final String MASTER_SERVICE_NAME = 
"kudu.master.MasterService";
   protected static final String TABLET_SERVER_SERVICE_NAME = 
"kudu.tserver.TabletServerService";
+  protected static final String TXN_MANAGER_SERVICE_NAME = 
"kudu.transactions.TxnManagerService";
 
   private static final Logger LOG = LoggerFactory.getLogger(KuduRpc.class);
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index a281b4d..28842e7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -18,15 +18,21 @@
 package org.apache.kudu.client;
 
 import java.io.IOException;
+import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.transactions.Transactions;
 import org.apache.kudu.transactions.Transactions.TxnTokenPB;
 
 /**
@@ -204,7 +210,9 @@ public class KuduTransaction implements AutoCloseable {
       Preconditions.checkState(!isInFlight);
     }
 
-    // TODO(aserbin): implement
+    // Make corresponding call to TxnManager and process the response,
+    // in a synchronous way.
+    doBeginTransaction();
 
     // Once the heavy-lifting has successfully completed, mark this instance
     // as a handle for an in-flight transaction.
@@ -255,9 +263,14 @@ public class KuduTransaction implements AutoCloseable {
    * @throws KuduException if something went wrong
    */
   public void commit(boolean wait) throws KuduException {
-    Preconditions.checkState(isInFlight);
+    Preconditions.checkState(isInFlight, "transaction is not open for this 
handle");
+    CommitTransactionRequest req = doCommitTransaction();
 
-    // TODO(aserbin): implement
+    if (wait) {
+      Deferred<GetTransactionStateResponse> txnState =
+          getDelayedIsTransactionCommittedDeferred(req);
+      KuduClient.joinAndHandleException(txnState);
+    }
 
     // Once everything else is completed successfully, mark the transaction as
     // no longer in flight.
@@ -274,9 +287,22 @@ public class KuduTransaction implements AutoCloseable {
    *                       the state of the transaction
    */
   public boolean isCommitComplete() throws KuduException {
-    Preconditions.checkState(!isInFlight);
-    // TODO(aserbin): implement
-    return false;
+    Deferred<GetTransactionStateResponse> d = isTransactionCommittedAsync();
+    GetTransactionStateResponse resp = KuduClient.joinAndHandleException(d);
+    final Transactions.TxnStatePB txnState = resp.txnState();
+    switch (txnState) {
+      case ABORTED:
+        throw new NonRecoverableException(Status.Aborted("transaction was 
aborted"));
+      case OPEN:
+        throw new NonRecoverableException(Status.IllegalState("transaction is 
still open"));
+      case COMMITTED:
+        return true;
+      case COMMIT_IN_PROGRESS:
+        return false;
+      default:
+        throw new NonRecoverableException(Status.NotSupported(
+            "unexpected transaction state: " + txnState.toString()));
+    }
   }
 
   /**
@@ -292,8 +318,7 @@ public class KuduTransaction implements AutoCloseable {
    */
   public void rollback() throws KuduException {
     Preconditions.checkState(isInFlight, "transaction is not open for this 
handle");
-
-    // TODO(aserbin): implement
+    doRollbackTransaction();
 
     // Once everything else is completed successfully, mark the transaction as
     // no longer in flight.
@@ -394,4 +419,127 @@ public class KuduTransaction implements AutoCloseable {
       LOG.error("exception while automatically rolling back a transaction", e);
     }
   }
+
+  private void doBeginTransaction() throws KuduException {
+    BeginTransactionRequest request = new BeginTransactionRequest(
+        client.getMasterTable(),
+        client.getTimer(),
+        client.getDefaultAdminOperationTimeoutMs());
+    Deferred<BeginTransactionResponse> d = client.sendRpcToTablet(request);
+    BeginTransactionResponse resp = KuduClient.joinAndHandleException(d);
+    txnId = resp.txnId();
+    keepaliveMillis = resp.keepaliveMillis();
+  }
+
+  private void doRollbackTransaction() throws KuduException {
+    AbortTransactionRequest request = new AbortTransactionRequest(
+        client.getMasterTable(),
+        client.getTimer(),
+        client.getDefaultAdminOperationTimeoutMs(),
+        txnId);
+    Deferred<AbortTransactionResponse> d = client.sendRpcToTablet(request);
+    KuduClient.joinAndHandleException(d);
+  }
+
+  private CommitTransactionRequest doCommitTransaction() throws KuduException {
+    CommitTransactionRequest request = new CommitTransactionRequest(
+        client.getMasterTable(),
+        client.getTimer(),
+        client.getDefaultAdminOperationTimeoutMs(),
+        txnId);
+    Deferred<CommitTransactionResponse> d = client.sendRpcToTablet(request);
+    KuduClient.joinAndHandleException(d);
+    return request;
+  }
+
+  private Deferred<GetTransactionStateResponse> isTransactionCommittedAsync() {
+    GetTransactionStateRequest request = new GetTransactionStateRequest(
+        client.getMasterTable(),
+        client.getTimer(),
+        client.getDefaultAdminOperationTimeoutMs(),
+        txnId);
+    return client.sendRpcToTablet(request);
+  }
+
+  Deferred<GetTransactionStateResponse> 
getDelayedIsTransactionCommittedDeferred(
+      KuduRpc<?> parent) {
+    // TODO(aserbin): By scheduling even the first RPC via timer, the sequence 
of
+    // RPCs is delayed by at least one timer tick, which is unfortunate for the
+    // case where the transaction is fully committed.
+    //
+    // Eliminating the delay by sending the first RPC immediately (and
+    // scheduling the rest via timer) would also allow us to replace this 
"fake"
+    // RPC with a real one.
+    KuduRpc<GetTransactionStateResponse> fakeRpc = client.buildFakeRpc(
+        "GetTransactionState", parent);
+
+    // Store the Deferred locally; callback() or errback() on the RPC will
+    // reset it and we'd return a different, non-triggered Deferred.
+    Deferred<GetTransactionStateResponse> fakeRpcD = fakeRpc.getDeferred();
+
+    delayedIsTransactionCommitted(
+        fakeRpc,
+        isTransactionCommittedCb(fakeRpc),
+        isTransactionCommittedErrb(fakeRpc));
+    return fakeRpcD;
+  }
+
+  private void delayedIsTransactionCommitted(
+      final KuduRpc<GetTransactionStateResponse> rpc,
+      final Callback<Deferred<GetTransactionStateResponse>,
+          GetTransactionStateResponse> callback,
+      final Callback<Exception, Exception> errback) {
+    final class RetryTimer implements TimerTask {
+      @Override
+      public void run(final Timeout timeout) {
+        isTransactionCommittedAsync().addCallbacks(callback, errback);
+      }
+    }
+
+    long sleepTimeMillis = client.getSleepTimeForRpcMillis(rpc);
+    if (rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
+      AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, null);
+      return;
+    }
+    AsyncKuduClient.newTimeout(client.getTimer(), new RetryTimer(), 
sleepTimeMillis);
+  }
+
+  /**
+   * Returns a callback to be called upon completion of GetTransactionState 
RPC.
+   * If the transaction is committed, triggers the provided RPC's callback 
chain
+   * with 'txnResp' as its value. Otherwise, sends another GetTransactionState
+   * RPC after sleeping.
+   * <p>
+   * @param rpc RPC that initiated this sequence of operations
+   * @return callback that will eventually return 'txnResp'
+   */
+  private Callback<Deferred<GetTransactionStateResponse>, 
GetTransactionStateResponse>
+      isTransactionCommittedCb(final KuduRpc<GetTransactionStateResponse> rpc) 
{
+    return resp -> {
+      // Store the Deferred locally; callback() below will reset it and we'd
+      // return a different, non-triggered Deferred.
+      Deferred<GetTransactionStateResponse> d = rpc.getDeferred();
+      if (resp.isCommitted()) {
+        rpc.callback(resp);
+      } else if (resp.isAborted()) {
+        rpc.errback(new NonRecoverableException(
+            Status.Aborted("transaction was aborted")));
+      } else {
+        rpc.attempt++;
+        delayedIsTransactionCommitted(
+            rpc,
+            isTransactionCommittedCb(rpc),
+            isTransactionCommittedErrb(rpc));
+      }
+      return d;
+    };
+  }
+
+  private <R> Callback<Exception, Exception> isTransactionCommittedErrb(
+      final KuduRpc<R> rpc) {
+    return e -> {
+      rpc.errback(e);
+      return e;
+    };
+  }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 36b1737..ce0fd92 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -46,6 +46,7 @@ import org.apache.kudu.WireProtocol;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.rpc.RpcHeader;
 import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.transactions.TxnManager;
 import org.apache.kudu.tserver.Tserver;
 import org.apache.kudu.util.Pair;
 
@@ -271,7 +272,7 @@ class RpcProxy {
     // We can get this Message from within the RPC's expected type,
     // so convert it into an exception and nullify decoded so that we use the 
errback route.
     // Have to do it for both TS and Master errors.
-    if (decoded != null) {
+    if (decoded != null && decoded.getSecond() != null) {
       if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
         Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB) 
decoded.getSecond();
         exception = dispatchTSError(client, connection, rpc, error, 
traceBuilder);
@@ -291,6 +292,22 @@ class RpcProxy {
         } else {
           decoded = null;
         }
+      } else if (decoded.getSecond() instanceof TxnManager.TxnManagerErrorPB) {
+        TxnManager.TxnManagerErrorPB error =
+            (TxnManager.TxnManagerErrorPB) decoded.getSecond();
+        exception = dispatchTxnManagerError(client, connection, rpc, error, 
traceBuilder);
+        if (exception == null) {
+          // Exception was taken care of.
+          return;
+        } else {
+          decoded = null;
+        }
+      } else {
+        rpc.addTrace(traceBuilder.build());
+        exception = new NonRecoverableException(Status.NotSupported(
+            "unexpected error from server side: " + 
decoded.getSecond().toString()));
+        rpc.errback(exception);
+        return;
       }
     }
 
@@ -398,6 +415,40 @@ class RpcProxy {
   }
 
   /**
+   * Handle for various kinds of TxnManager errors. As of now, only
+   * SERVICE_UNAVAILABLE is a re-triable error.
+   *
+   * @param client client object to handle response and sending retries, if 
needed
+   * @param connection connection to send the request over
+   * @param rpc the original RPC call that triggered the error
+   * @param pbError the error the master sent
+   * @param tracer RPC trace builder to add a record on the error into the 
call history
+   * @return an exception if we couldn't dispatch the error, or null
+   */
+  private static KuduException dispatchTxnManagerError(
+      AsyncKuduClient client,
+      Connection connection,
+      KuduRpc<?> rpc,
+      TxnManager.TxnManagerErrorPB pbError,
+      RpcTraceFrame.RpcTraceFrameBuilder tracer) {
+    final WireProtocol.AppStatusPB.ErrorCode code = 
pbError.getStatus().getCode();
+    final Status status = Status.fromTxnManagerErrorPB(pbError);
+    if (code != WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
+      return new NonRecoverableException(status);
+    }
+
+    // TODO(aserbin): try sending request to other TxnManager instance,
+    //                if possible. The idea is that Kudu clusters are expected
+    //                expected to have multiple masters, so if one TxnManager
+    //                instance is not available, there is a high chance that
+    //                others are still available (TxnManager is hosted by a
+    //                kudu-master process).
+    client.handleRetryableError(rpc, new RecoverableException(status));
+    rpc.addTrace(tracer.callStatus(status).build());
+    return null;
+  }
+
+  /**
    * Retry the given RPC.
    *
    * @param client client object to handle response and sending retries, if 
needed
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Status.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Status.java
index d0d0d4d..0455755 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Status.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Status.java
@@ -22,6 +22,7 @@ import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.WireProtocol;
 import org.apache.kudu.master.Master;
+import org.apache.kudu.transactions.TxnManager;
 import org.apache.kudu.tserver.Tserver;
 
 /**
@@ -95,6 +96,16 @@ public class Status {
   }
 
   /**
+   * Create a status object from a TxnManager's error.
+   * @param pbError protobuf object received via RPC from the TxnManager
+   * @return status object equivalent to the protobuf
+   */
+  static Status fromTxnManagerErrorPB(TxnManager.TxnManagerErrorPB pbError) {
+    assert pbError.hasStatus() : "no status in PB " + pbError;
+    return new Status(pbError.getStatus());
+  }
+
+  /**
    * Create a Status object from a {@link WireProtocol.AppStatusPB} protobuf 
object.
    * Package-private because we shade Protobuf and this is not usable outside 
this package.
    */
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
new file mode 100644
index 0000000..c0bbaf5
--- /dev/null
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.function.ThrowingRunnable;
+
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
+import org.apache.kudu.transactions.Transactions.TxnTokenPB;
+
+
+public class TestKuduTransaction {
+  private KuduClient client;
+  private AsyncKuduClient asyncClient;
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Before
+  public void setUp() {
+    client = harness.getClient();
+    asyncClient = harness.getAsyncClient();
+  }
+
+  private KuduTransaction makeFakeTransaction(KuduTransaction txn) throws 
IOException {
+    byte[] buf = txn.serialize();
+    final TxnTokenPB pb = 
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+    assertTrue(pb.hasTxnId());
+    final long txnId = pb.getTxnId();
+    assertTrue(txnId > AsyncKuduClient.INVALID_TXN_ID);
+
+    final long fakeTxnId = txnId + 123;
+
+    TxnTokenPB.Builder b = TxnTokenPB.newBuilder();
+    b.setTxnId(fakeTxnId);
+    b.setEnableKeepalive(false);
+    b.setKeepaliveMillis(0);
+    TxnTokenPB message = b.build();
+    byte[] fakeTxnBuf = new byte[message.getSerializedSize()];
+    CodedOutputStream cos = CodedOutputStream.newInstance(fakeTxnBuf);
+    message.writeTo(cos);
+    cos.flush();
+    return KuduTransaction.deserialize(fakeTxnBuf, asyncClient);
+  }
+
+  /**
+   * Test scenario that starts a new transaction given an instance of
+   * KuduClient. The purpose of this test is to make sure it's possible
+   * to start a new transaction given a KuduClient object.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testNewTransaction() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    assertNotNull(txn);
+    byte[] buf = txn.serialize();
+    assertNotNull(buf);
+    final TxnTokenPB pb = 
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+    assertTrue(pb.hasTxnId());
+    assertTrue(pb.getTxnId() > AsyncKuduClient.INVALID_TXN_ID);
+    assertTrue(pb.hasEnableKeepalive());
+    // By default, keepalive is disabled for a serialized txn token.
+    assertFalse(pb.getEnableKeepalive());
+    assertTrue(pb.hasKeepaliveMillis());
+    assertTrue(pb.getKeepaliveMillis() > 0);
+  }
+
+  /**
+   * Test scenario that starts many new transaction given an instance of
+   * KuduClient.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testStartManyTransactions() throws Exception {
+    List<KuduTransaction> transactions = new ArrayList<>();
+    for (int i = 0; i < 1000; ++i) {
+      KuduTransaction txn = client.newTransaction();
+      assertNotNull(txn);
+      transactions.add(txn);
+    }
+    for (KuduTransaction txn : transactions) {
+      txn.rollback();
+    }
+  }
+
+  /**
+   * Test scenario that starts a new transaction and rolls it back.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testRollbackAnEmptyTransaction() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    txn.rollback();
+    // A duplicate call to rollback an aborted transaction using the same
+    // handle should report an error.
+    IllegalStateException ex = assertThrows(
+        IllegalStateException.class, new ThrowingRunnable() {
+          @Override
+          public void run() throws Throwable {
+            txn.rollback();
+          }
+        });
+    assertEquals("transaction is not open for this handle", ex.getMessage());
+
+    // Try to rollback the same transaction using another handle that has been
+    // constructed using serialize/deserialize sequence: it should be fine
+    // since aborting a transaction has idempotent semantics for the back-end.
+    byte[] buf = txn.serialize();
+    KuduTransaction serdesTxn = KuduTransaction.deserialize(buf, asyncClient);
+    serdesTxn.rollback();
+  }
+
+  /**
+   * Test scenario that starts a new transaction and commits it right away.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testCommitAnEmptyTransaction() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    txn.commit(false);
+    // A duplicate call to commit the transaction using the same handle
+    // should fail.
+    IllegalStateException ex = assertThrows(
+        IllegalStateException.class, new ThrowingRunnable() {
+          @Override
+          public void run() throws Throwable {
+            txn.commit(false);
+          }
+        });
+    assertEquals("transaction is not open for this handle", ex.getMessage());
+
+    // Try to commit the same transaction using another handle that has been
+    // constructed using serialize/deserialize sequence: it should be fine
+    // since committing a transaction has idempotent semantics for the 
back-end.
+    byte[] buf = txn.serialize();
+    KuduTransaction serdesTxn = KuduTransaction.deserialize(buf, asyncClient);
+    serdesTxn.commit(false);
+  }
+
+  /**
+   * Test scenario that tries to commit a non-existent transaction.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testCommitNonExistentTransaction() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    assertNotNull(txn);
+    KuduTransaction fakeTxn = makeFakeTransaction(txn);
+    try {
+      // Try to commit the transaction in non-synchronous mode, i.e. just
+      // initiate committing the transaction.
+      fakeTxn.commit(false);
+      fail("committing a non-existing transaction should have failed");
+    } catch (NonRecoverableException e) {
+      final String errmsg = e.getMessage();
+      final Status status = e.getStatus();
+      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
+    } catch (Exception e) {
+      fail("unexpected exception: " + e.toString());
+    }
+
+    try {
+      // Try to commit the transaction in synchronous mode, i.e. initiate
+      // committing the transaction and wait for the commit phase to finalize.
+      fakeTxn.commit(true);
+      fail("committing a non-existing transaction should have failed");
+    } catch (NonRecoverableException e) {
+      final String errmsg = e.getMessage();
+      final Status status = e.getStatus();
+      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
+    } catch (Exception e) {
+      fail("unexpected exception: " + e.toString());
+    }
+  }
+
+  /**
+   * Test scenario that starts a new transaction, initiates its commit phase,
+   * and checks whether the commit is complete using the
+   * KuduTransaction.isCommitComplete() method.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testIsCommitComplete() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+
+    txn.commit(false);
+    // TODO(aserbin): artificially delay the transaction's commit phase once
+    //                the transaction commit orchestration is implemented
+    assertFalse(txn.isCommitComplete());
+  }
+
+  /**
+   * Verify how KuduTransaction.isCommitComplete() works for a transaction 
handle
+   * in a few special cases.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testIsCommitCompleteSpecialCases() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+
+    {
+      NonRecoverableException ex = assertThrows(
+          NonRecoverableException.class, new ThrowingRunnable() {
+            @Override
+            public void run() throws Throwable {
+              txn.isCommitComplete();
+            }
+          });
+      assertTrue(ex.getStatus().isIllegalState());
+      assertEquals("transaction is still open", ex.getMessage());
+    }
+
+    // Rollback the transaction.
+    txn.rollback();
+
+    {
+      NonRecoverableException ex = assertThrows(
+          NonRecoverableException.class, new ThrowingRunnable() {
+            @Override
+            public void run() throws Throwable {
+              txn.isCommitComplete();
+            }
+          });
+      assertTrue(ex.getStatus().isAborted());
+      assertEquals("transaction was aborted", ex.getMessage());
+    }
+
+    // Try to call isCommitComplete() on a handle that isn't backed by any
+    // transaction registered with the system.
+    {
+      KuduTransaction fakeTxn = makeFakeTransaction(txn);
+      NonRecoverableException ex = assertThrows(
+          NonRecoverableException.class, new ThrowingRunnable() {
+            @Override
+            public void run() throws Throwable {
+              fakeTxn.isCommitComplete();
+            }
+          });
+      final Status status = ex.getStatus();
+      assertTrue(status.toString(), status.isNotFound());
+      final String errmsg = ex.getMessage();
+      assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
+    }
+  }
+
+  /**
+   * Test scenario that starts a new transaction and commits it in a 
synchronous
+   * way (i.e. waits for the transaction to be committed).
+   *
+   * TODO(aserbin): uncomment this once txn commit orchestration is ready
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testCommitAnEmptyTransactionWait() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    txn.commit(true);
+    assertTrue(txn.isCommitComplete());
+  }
+   */
+
+  /**
+   * A test scenario to start a new transaction and commit it in a synchronous
+   * way (i.e. wait for the transaction to be committed) when the back-end is
+   * running in the test-only mode to immediately finalize a transaction
+   * right after transitioning its state to COMMIT_IN_PROGRESS.
+   *
+   * TODO(aserbin): remove this scenario once txn commit orchestration is ready
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  @TabletServerConfig(flags = {
+      "--txn_status_manager_finalize_commit_on_begin",
+  })
+  public void testCommitAnEmptyTransactionWaitFake2PCO() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    txn.commit(true);
+    assertTrue(txn.isCommitComplete());
+  }
+
+  /**
+   * Test scenario that tries to rollback a non-existent transaction.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testRollbackNonExistentTransaction() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    assertNotNull(txn);
+    KuduTransaction fakeTxn = makeFakeTransaction(txn);
+    try {
+      fakeTxn.rollback();
+      fail("rolling back non-existing transaction should have failed");
+    } catch (NonRecoverableException e) {
+      final String errmsg = e.getMessage();
+      final Status status = e.getStatus();
+      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
+    } catch (Exception e) {
+      fail("unexpected exception: " + e.toString());
+    }
+  }
+
+  /**
+   * Test scenario that starts a new transaction given an instance of
+   * AsyncKuduClient. The purpose of this test is to make sure it's possible
+   * to start a new transaction given an AsyncKuduClient object.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testNewTransactionAsyncClient() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    assertNotNull(txn);
+    byte[] buf = txn.serialize();
+    final TxnTokenPB pb = 
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+    assertTrue(pb.hasTxnId());
+    assertTrue(pb.getTxnId() > AsyncKuduClient.INVALID_TXN_ID);
+    assertTrue(pb.hasEnableKeepalive());
+    // By default, keepalive is disabled for a serialized txn token.
+    assertFalse(pb.getEnableKeepalive());
+    assertTrue(pb.hasKeepaliveMillis());
+    assertTrue(pb.getKeepaliveMillis() > 0);
+  }
+
+  /**
+   * Test scenario that starts a transaction and creates a new transactional
+   * KuduSession based on the newly started transaction.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testNewTransactionalSession() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    assertNotNull(txn);
+    KuduSession session = txn.newKuduSession();
+    assertNotNull(session);
+    // TODO(aserbin): insert a few rows and rollback the transaction; run a
+    //                table scan: the rows should not be there
+    txn.rollback();
+  }
+
+  /**
+   * Test scenario that starts a transaction and creates a new transactional
+   * AsyncKuduSession based on the newly started transaction.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testNewAsyncTransactionalSession() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    assertNotNull(txn);
+    AsyncKuduSession session = txn.newAsyncKuduSession();
+    assertNotNull(session);
+    // TODO(aserbin): insert a few rows and rollback the transaction; run a
+    //                table scan: the rows should not be there
+    txn.rollback();
+  }
+
+  /**
+   * Try to start a transaction when the backend doesn't have the required
+   * functionality (e.g. a backend which predates the introduction of the
+   * txn-related functionality).
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled=false",
+  })
+  public void testTxnOpsWithoutTxnManager() throws Exception {
+    try (KuduTransaction txn = client.newTransaction()) {
+      fail("starting a new transaction without TxnManager should have failed");
+    } catch (KuduException e) {
+      final String errmsg = e.getMessage();
+      final Status status = e.getStatus();
+      assertTrue(status.toString(), status.isRemoteError());
+      assertTrue(errmsg, errmsg.matches(".* Not found: .*"));
+      assertTrue(errmsg, errmsg.matches(
+          ".* kudu.transactions.TxnManagerService not registered on Master"));
+    } catch (Exception e) {
+      fail("unexpected exception: " + e.toString());
+    }
+  }
+
+  /**
+   * Test KuduTransaction to be used in auto-closable manner.
+   *
+   * TOOD(aserbin): update this once transaction handles send keepalive 
messages
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled=true",
+  })
+  public void testAutoclosableUsage() throws Exception {
+    try (KuduTransaction txn = client.newTransaction()) {
+      byte[] buf = txn.serialize();
+      assertNotNull(buf);
+      txn.commit(false);
+      txn.isCommitComplete();
+    } catch (Exception e) {
+      fail("unexpected exception: " + e.toString());
+    }
+  }
+}
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index 8062df6..27ed3d8 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -73,6 +73,16 @@ DEFINE_uint32(txn_staleness_tracker_interval_ms, 10000,
 TAG_FLAG(txn_staleness_tracker_interval_ms, experimental);
 TAG_FLAG(txn_staleness_tracker_interval_ms, runtime);
 
+// TODO(aserbin): remove this test-only crutch once the orchestration of
+//                the two phase commit is implemented
+DEFINE_bool(txn_status_manager_finalize_commit_on_begin, false,
+            "Finalize committing a transaction automatically right after "
+            "changing its state to COMMIT_IN_PROGRESS during processing "
+            "a call to CoordinateTransaction() of the BEGIN_COMMIT_TXN type. "
+            "Used only for tests.");
+TAG_FLAG(txn_status_manager_finalize_commit_on_begin, hidden);
+TAG_FLAG(txn_status_manager_finalize_commit_on_begin, unsafe);
+
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
@@ -319,6 +329,13 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t 
txn_id, const string& us
   mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, 
ts_error));
   txn_lock.Commit();
+
+  // TODO(aserbin): remove this test-only crutch once the orchestration of
+  //                the two phase commit is implemented
+  if (PREDICT_FALSE(FLAGS_txn_status_manager_finalize_commit_on_begin)) {
+    RETURN_NOT_OK(FinalizeCommitTransaction(txn_id, ts_error));
+  }
+
   return Status::OK();
 }
 

Reply via email to