This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6b39d88 KUDU-2612 keep-alive txn heartbeating for Java client
6b39d88 is described below
commit 6b39d88f11abd6e52cf91999aa66ce0642372599
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jan 19 09:00:03 2021 -0800
KUDU-2612 keep-alive txn heartbeating for Java client
This patch adds keep-alive txn heartbeating into the Kudu Java client.
The txn keepalive heartbeating is performed automatically by the client,
and no API is exposed to send keep-alive messages for a transaction.
The txn keepalive heartbeating continues until the original
auto-closeable transaction handle (i.e. the handle created by
KuduClient.newTransaction()) goes out of scope or the
KuduTransaction.close() method called explicitly.
Overall, the keepalive heartbeating behavior in the Kudu Java client
mirrors the behavior of its C++ counterpart.
This patch also contains a couple of test scenarios to cover the
newly introduced functionality.
Change-Id: I6326a5452223d8173b2da004bb5f3ab0c1e6ae4e
Reviewed-on: http://gerrit.cloudera.org:8080/16967
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
.../kudu/client/BeginTransactionRequest.java | 4 +-
...quest.java => KeepTransactionAliveRequest.java} | 39 ++--
.../kudu/client/KeepTransactionAliveResponse.java | 32 +++
.../org/apache/kudu/client/KuduTransaction.java | 131 ++++++++++-
.../apache/kudu/client/TestKuduTransaction.java | 255 ++++++++++++++++++++-
5 files changed, 434 insertions(+), 27 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
index eccddb8..9e4bff5 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
@@ -38,8 +38,8 @@ import org.apache.kudu.util.Pair;
class BeginTransactionRequest extends KuduRpc<BeginTransactionResponse> {
private static final List<Integer> featureFlags = ImmutableList.of();
- BeginTransactionRequest(KuduTable fakeTable, Timer timer, long
timeoutMillis) {
- super(fakeTable, timer, timeoutMillis);
+ BeginTransactionRequest(KuduTable masterTable, Timer timer, long
timeoutMillis) {
+ super(masterTable, timer, timeoutMillis);
}
@Override
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KeepTransactionAliveRequest.java
similarity index 61%
copy from
java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
copy to
java/kudu-client/src/main/java/org/apache/kudu/client/KeepTransactionAliveRequest.java
index eccddb8..b446b2a 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/BeginTransactionRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/KeepTransactionAliveRequest.java
@@ -17,7 +17,7 @@
package org.apache.kudu.client;
-import static
org.apache.kudu.transactions.TxnManager.BeginTransactionResponsePB;
+import static
org.apache.kudu.transactions.TxnManager.KeepTransactionAliveResponsePB;
import java.util.Collection;
import java.util.List;
@@ -32,19 +32,28 @@ import org.apache.kudu.transactions.TxnManager;
import org.apache.kudu.util.Pair;
/**
- * A wrapper class for kudu.transactions.TxnManagerService.BeginTransaction
RPC.
+ * A wrapper class for
kudu.transactions.TxnManagerService.CoordinateTransaction RPC.
*/
@InterfaceAudience.Private
-class BeginTransactionRequest extends KuduRpc<BeginTransactionResponse> {
+class KeepTransactionAliveRequest extends
KuduRpc<KeepTransactionAliveResponse> {
private static final List<Integer> featureFlags = ImmutableList.of();
+ private final long txnId;
- BeginTransactionRequest(KuduTable fakeTable, Timer timer, long
timeoutMillis) {
- super(fakeTable, timer, timeoutMillis);
+ KeepTransactionAliveRequest(KuduTable masterTable,
+ Timer timer,
+ long timeoutMillis,
+ long txnId) {
+ super(masterTable, timer, timeoutMillis);
+ Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
+ this.txnId = txnId;
}
@Override
Message createRequestPB() {
- return TxnManager.BeginTransactionRequestPB.getDefaultInstance();
+ final TxnManager.KeepTransactionAliveRequestPB.Builder b =
+ TxnManager.KeepTransactionAliveRequestPB.newBuilder();
+ b.setTxnId(txnId);
+ return b.build();
}
@Override
@@ -54,23 +63,17 @@ class BeginTransactionRequest extends
KuduRpc<BeginTransactionResponse> {
@Override
String method() {
- return "BeginTransaction";
+ return "KeepTransactionAlive";
}
@Override
- Pair<BeginTransactionResponse, Object> deserialize(
+ Pair<KeepTransactionAliveResponse, Object> deserialize(
final CallResponse callResponse, String serverUUID) throws KuduException
{
- final BeginTransactionResponsePB.Builder b =
BeginTransactionResponsePB.newBuilder();
+ final KeepTransactionAliveResponsePB.Builder b =
+ KeepTransactionAliveResponsePB.newBuilder();
readProtobuf(callResponse.getPBMessage(), b);
- if (!b.hasError()) {
- Preconditions.checkState(b.hasTxnId());
- Preconditions.checkState(b.hasKeepaliveMillis());
- }
- BeginTransactionResponse response = new BeginTransactionResponse(
- timeoutTracker.getElapsedMillis(),
- serverUUID,
- b.getTxnId(),
- b.getKeepaliveMillis());
+ KeepTransactionAliveResponse response = new KeepTransactionAliveResponse(
+ timeoutTracker.getElapsedMillis(), serverUUID);
return new Pair<>(response, b.hasError() ? b.getError() : null);
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KeepTransactionAliveResponse.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KeepTransactionAliveResponse.java
new file mode 100644
index 0000000..333ba97
--- /dev/null
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/KeepTransactionAliveResponse.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class KeepTransactionAliveResponse extends KuduRpcResponse {
+ /**
+ * @param elapsedMillis time in milliseconds since RPC creation to now
+ * @param serverUUID UUID of the server that sent the response
+ */
+ KeepTransactionAliveResponse(long elapsedMillis, String serverUUID) {
+ super(elapsedMillis, serverUUID);
+ }
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 28842e7..292ccdb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -155,6 +155,8 @@ public class KuduTransaction implements AutoCloseable {
private boolean keepaliveEnabled = true;
private boolean isInFlight = false;
private final Object isInFlightSync = new Object();
+ private Timeout keepaliveTaskHandle = null;
+ private final Object keepaliveTaskHandleSync = new Object();
/**
* Create an instance of a transaction handle bound to the specified client.
@@ -195,6 +197,9 @@ public class KuduTransaction implements AutoCloseable {
this.txnId = txnId;
this.keepaliveMillis = keepaliveMillis;
this.keepaliveEnabled = keepaliveEnabled;
+
+ startKeepaliveHeartbeating();
+
this.isInFlight = true;
}
@@ -214,6 +219,8 @@ public class KuduTransaction implements AutoCloseable {
// in a synchronous way.
doBeginTransaction();
+ startKeepaliveHeartbeating();
+
// Once the heavy-lifting has successfully completed, mark this instance
// as a handle for an in-flight transaction.
synchronized (isInFlightSync) {
@@ -265,6 +272,17 @@ public class KuduTransaction implements AutoCloseable {
public void commit(boolean wait) throws KuduException {
Preconditions.checkState(isInFlight, "transaction is not open for this
handle");
CommitTransactionRequest req = doCommitTransaction();
+ // Now, there is no need to continue sending keepalive messages: the
+ // transaction should be in COMMIT_IN_PROGRESS state after successful
+ // completion of the calls above, and the backend takes care of everything
+ // else: nothing is required from the client side to successfully complete
+ // the commit phase of the transaction past this point.
+ synchronized (keepaliveTaskHandleSync) {
+ if (keepaliveTaskHandle != null) {
+ LOG.debug("stopping keepalive heartbeating after commit (txn ID {})",
txnId);
+ keepaliveTaskHandle.cancel();
+ }
+ }
if (wait) {
Deferred<GetTransactionStateResponse> txnState =
@@ -319,6 +337,13 @@ public class KuduTransaction implements AutoCloseable {
public void rollback() throws KuduException {
Preconditions.checkState(isInFlight, "transaction is not open for this
handle");
doRollbackTransaction();
+ // Now, there is no need to continue sending keepalive messages.
+ synchronized (keepaliveTaskHandleSync) {
+ if (keepaliveTaskHandle != null) {
+ LOG.debug("stopping keepalive heartbeating after rollback (txn ID
{})", txnId);
+ keepaliveTaskHandle.cancel();
+ }
+ }
// Once everything else is completed successfully, mark the transaction as
// no longer in flight.
@@ -349,7 +374,7 @@ public class KuduTransaction implements AutoCloseable {
* @throws IOException if serialization fails
*/
public byte[] serialize(SerializationOptions options) throws IOException {
- LOG.debug("serializing handle for transaction ID {}", txnId);
+ LOG.debug("serializing handle (txn ID {})", txnId);
Preconditions.checkState(
txnId != AsyncKuduClient.INVALID_TXN_ID,
"invalid transaction handle");
@@ -411,9 +436,11 @@ public class KuduTransaction implements AutoCloseable {
@Override
public void close() {
try {
- if (keepaliveEnabled) {
- LOG.debug("stopping keepalive heartbeating for transaction ID {}",
txnId);
- // TODO(aserbin): stop sending keepalive heartbeats to TxnManager
+ synchronized (keepaliveTaskHandleSync) {
+ if (keepaliveTaskHandle != null) {
+ LOG.debug("stopping keepalive heartbeating (txn ID {})", txnId);
+ keepaliveTaskHandle.cancel();
+ }
}
} catch (Exception e) {
LOG.error("exception while automatically rolling back a transaction", e);
@@ -542,4 +569,100 @@ public class KuduTransaction implements AutoCloseable {
return e;
};
}
+
+ /**
+ * Return period for sending keepalive messages for the specified keepalive
+ * timeout (both in milliseconds). The latter is dictated by the backend
+ * which can automatically rollback a transaction after not receiving
+ * keepalive messages for longer than the specified timeout interval.
+ * Ideally, it would be enough to send a heartbeat message every
+ * {@code keepaliveMillis} interval, but given scheduling irregularities,
+ * client node timer's precision, and various network delays and latencies,
+ * it's safer to schedule sending keepalive messages from the client side
+ * more frequently.
+ *
+ * @param keepaliveMillis the keepalive timeout interval
+ * @return a proper period for sending keepalive messages from the client
side
+ */
+ private static long keepalivePeriodForTimeout(long keepaliveMillis) {
+ Preconditions.checkArgument(keepaliveMillis > 0,
+ "keepalive timeout must be a positive number");
+ long period = keepaliveMillis / 2;
+ if (period <= 0) {
+ period = 1;
+ }
+ return period;
+ }
+
+ private void startKeepaliveHeartbeating() {
+ if (keepaliveEnabled) {
+ LOG.debug("starting keepalive heartbeating with period {} ms (txn ID
{})",
+ txnId, keepalivePeriodForTimeout(keepaliveMillis));
+ doStartKeepaliveHeartbeating();
+ } else {
+ LOG.debug("keepalive heartbeating disabled for this handle (txn ID {})",
txnId);
+ }
+ }
+
+ private final class SendKeepaliveTask implements TimerTask {
+ /**
+ * Send keepalive heartbeat message for the transaction represented by
+ * this {@link KuduTransaction} handle and re-schedule itself
+ * (i.e. this task) to send next heartbeat interval
+ *
+ * @param timeout a handle which is associated with this task
+ */
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled()) {
+ LOG.debug("terminating keepalive task (txn ID {})", txnId);
+ return;
+ }
+ try {
+ doSendKeepalive();
+ } catch (RecoverableException e) {
+ // Just continue sending heartbeats as required: the recoverable
+ // exception means the condition is transient.
+ // TODO(aserbin): should we send next heartbeat sooner? E.g., retry
+ // immediately, and do such retry only once after a
+ // failure like this. The idea is to avoid missing
+ // heartbeats in situations where the second attempt
+ // after keepaliveMillis/2 would as well due to a
network
+ // issue, but immediate retry could succeed.
+ LOG.debug("continuing keepalive heartbeating (txn ID {}): {}",
+ txnId, e.toString());
+ } catch (Exception e) {
+ LOG.debug("terminating keepalive task (txn ID {}) due to exception {}",
+ txnId, e.toString());
+ return;
+ }
+ synchronized (keepaliveTaskHandleSync) {
+ // Re-schedule the task, refreshing the task handle.
+ keepaliveTaskHandle = AsyncKuduClient.newTimeout(
+ timeout.timer(), this, keepalivePeriodForTimeout(keepaliveMillis));
+ }
+ }
+
+ private void doSendKeepalive() throws KuduException {
+ KeepTransactionAliveRequest request = new KeepTransactionAliveRequest(
+ client.getMasterTable(),
+ client.getTimer(),
+ client.getDefaultAdminOperationTimeoutMs(),
+ txnId);
+ Deferred<KeepTransactionAliveResponse> d =
client.sendRpcToTablet(request);
+ KuduClient.joinAndHandleException(d);
+ }
+ }
+
+ void doStartKeepaliveHeartbeating() {
+ Preconditions.checkState(keepaliveEnabled);
+ synchronized (keepaliveTaskHandleSync) {
+ Preconditions.checkState(keepaliveTaskHandle == null,
+ "keepalive heartbeating has already started");
+ keepaliveTaskHandle = AsyncKuduClient.newTimeout(
+ client.getTimer(),
+ new SendKeepaliveTask(),
+ keepalivePeriodForTimeout(keepaliveMillis));
+ }
+ }
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index c0bbaf5..7e18dfa 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -437,21 +437,270 @@ public class TestKuduTransaction {
/**
* Test KuduTransaction to be used in auto-closable manner.
- *
- * TOOD(aserbin): update this once transaction handles send keepalive
messages
*/
@Test(timeout = 100000)
@MasterServerConfig(flags = {
"--txn_manager_enabled=true",
})
public void testAutoclosableUsage() throws Exception {
+ byte[] buf = null;
+
try (KuduTransaction txn = client.newTransaction()) {
- byte[] buf = txn.serialize();
+ buf = txn.serialize();
assertNotNull(buf);
txn.commit(false);
txn.isCommitComplete();
} catch (Exception e) {
fail("unexpected exception: " + e.toString());
}
+
+ try (KuduTransaction txn = KuduTransaction.deserialize(buf, asyncClient)) {
+ buf = txn.serialize();
+ assertNotNull(buf);
+ txn.rollback();
+ } catch (Exception e) {
+ fail("unexpected exception: " + e.toString());
+ }
+
+ // Do this once more time, just in case to verify that handles created by
+ // the serialize/deserialize sequence behave as expected.
+ try (KuduTransaction txn = KuduTransaction.deserialize(buf, asyncClient)) {
+ buf = txn.serialize();
+ assertNotNull(buf);
+ txn.rollback();
+ } catch (Exception e) {
+ fail("unexpected exception: " + e.toString());
+ }
+
+ {
+ KuduTransaction txn = client.newTransaction();
+ // Explicitly call KuduTransaction.close() more than once time to make
+ // sure it's possible to do so and the method's behavior is idempotent.
+ txn.close();
+ txn.close();
+ }
+ }
+
+ /**
+ * Verify that a transaction token created by the KuduClient.serialize()
+ * method has keepalive enabled or disabled as specified by the
+ * SerializationOptions.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ "--txn_manager_enabled=true",
+ })
+ public void testSerializationOptions() throws Exception {
+ final KuduTransaction txn = client.newTransaction();
+
+ // Check the keepalive settings when serializing/deserializing with default
+ // settings for SerializationOptions.
+ {
+ byte[] buf = txn.serialize();
+ TxnTokenPB pb = TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ assertTrue(pb.getKeepaliveMillis() > 0);
+ assertTrue(pb.hasEnableKeepalive());
+ assertFalse(pb.getEnableKeepalive());
+ }
+
+ // Same as above, but supply an instance of SerializationOptions with
+ // default settings created by the constructor.
+ {
+ KuduTransaction.SerializationOptions options =
+ new KuduTransaction.SerializationOptions();
+ byte[] buf = txn.serialize(options);
+ TxnTokenPB pb = TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ assertTrue(pb.getKeepaliveMillis() > 0);
+ assertTrue(pb.hasEnableKeepalive());
+ assertFalse(pb.getEnableKeepalive());
+ }
+
+ // Same as above, but explicitly disable keepalive for an instance of
+ // SerializationOptions.
+ {
+ KuduTransaction.SerializationOptions options =
+ new KuduTransaction.SerializationOptions();
+ options.setEnableKeepalive(false);
+ byte[] buf = txn.serialize(options);
+ TxnTokenPB pb = TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ assertTrue(pb.getKeepaliveMillis() > 0);
+ assertTrue(pb.hasEnableKeepalive());
+ assertFalse(pb.getEnableKeepalive());
+ }
+
+ // Explicitly enable keepalive with SerializationOptions.
+ {
+ KuduTransaction.SerializationOptions options =
+ new KuduTransaction.SerializationOptions();
+ options.setEnableKeepalive(true);
+ byte[] buf = txn.serialize(options);
+ TxnTokenPB pb = TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ assertTrue(pb.getKeepaliveMillis() > 0);
+ assertTrue(pb.hasEnableKeepalive());
+ assertTrue(pb.getEnableKeepalive());
+ }
+ }
+
+ /**
+ * Test that a KuduTransaction handle created by KuduClient.newTransaction()
+ * automatically sends keepalive messages.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ "--txn_manager_enabled=true",
+ })
+ @TabletServerConfig(flags = {
+ "--txn_keepalive_interval_ms=200",
+ "--txn_staleness_tracker_interval_ms=50",
+ })
+ public void testKeepaliveBasic() throws Exception {
+ try (KuduTransaction txn = client.newTransaction()) {
+ final byte[] buf = txn.serialize();
+ final TxnTokenPB pb =
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ final long keepaliveMillis = pb.getKeepaliveMillis();
+ assertTrue(keepaliveMillis > 0);
+ Thread.sleep(3 * keepaliveMillis);
+ // It should be possible to commit the transaction since it supposed to
be
+ // open at this point even after multiples of the inactivity timeout
+ // interval.
+ txn.commit(false);
+ } catch (Exception e) {
+ fail("unexpected exception: " + e.toString());
+ }
+
+ {
+ KuduTransaction txn = client.newTransaction();
+ final byte[] buf = txn.serialize();
+ final TxnTokenPB pb =
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ final long keepaliveMillis = pb.getKeepaliveMillis();
+ assertTrue(keepaliveMillis > 0);
+ // Call KuduTransaction.close() explicitly.
+ txn.close();
+
+ // Keep the handle around without any activity for longer than the
+ // keepalive timeout interval.
+ Thread.sleep(3 * keepaliveMillis);
+
+ // At this point, the underlying transaction should be automatically
+ // aborted by the backend. An attempt to commit the transaction should
+ // fail because the transaction is assumed to be already aborted at this
+ // point.
+ NonRecoverableException ex = assertThrows(
+ NonRecoverableException.class, new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ txn.commit(false);
+ }
+ });
+ final String errmsg = ex.getMessage();
+ assertTrue(errmsg, errmsg.matches(
+ ".* transaction ID .* is not open: state: ABORTED .*"));
+
+ // Verify that KuduTransaction.rollback() successfully runs on a
transaction
+ // handle if the underlying transaction is already aborted automatically
+ // by the backend. Rolling back the transaction explicitly should succeed
+ // since it's a pure no-op: rolling back a transaction has idempotent
+ // semantics.
+ txn.rollback();
+ }
+ }
+
+ /**
+ * Test that a KuduTransaction handle created by KuduClient.deserialize()
+ * automatically sends or doesn't send keepalive heartbeat messages
+ * depending on the SerializationOptions used while serializing the handle
+ * into a transaction token.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ "--txn_manager_enabled=true",
+ })
+ @TabletServerConfig(flags = {
+ "--txn_keepalive_interval_ms=200",
+ "--txn_staleness_tracker_interval_ms=50",
+ })
+ public void testKeepaliveForDeserializedHandle() throws Exception {
+ // Check the keepalive behavior when serializing/deserializing with default
+ // settings for SerializationOptions.
+ {
+ KuduTransaction txn = client.newTransaction();
+ final byte[] buf = txn.serialize();
+ final TxnTokenPB pb =
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ final long keepaliveMillis = pb.getKeepaliveMillis();
+ assertTrue(keepaliveMillis > 0);
+
+ KuduTransaction serdesTxn = KuduTransaction.deserialize(buf,
asyncClient);
+
+ // Call KuduTransaction.close() explicitly to stop sending automatic
+ // keepalive messages from 'txn' handle.
+ txn.close();
+
+ // Keep the handle around without any activity for longer than the
+ // keepalive timeout interval.
+ Thread.sleep(3 * keepaliveMillis);
+
+ // At this point, the underlying transaction should be automatically
+ // aborted by the backend: the 'txn' handle should not send any
heartbeats
+ // anymore since it's closed, and the 'serdesTxn' handle should not be
+ // sending any heartbeats.
+ NonRecoverableException ex = assertThrows(
+ NonRecoverableException.class, new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ serdesTxn.commit(false);
+ }
+ });
+ final String errmsg = ex.getMessage();
+ assertTrue(errmsg, errmsg.matches(
+ ".* transaction ID .* is not open: state: ABORTED .*"));
+
+ // Verify that KuduTransaction.rollback() successfully runs on both
+ // transaction handles if the underlying transaction is already aborted
+ // automatically by the backend.
+ txn.rollback();
+ serdesTxn.rollback();
+ }
+
+ // Check the keepalive behavior when serializing/deserializing when
+ // keepalive heartbeating is enabled in SerializationOptions used
+ // during the serialization of the original transaction handle.
+ {
+ final KuduTransaction.SerializationOptions options =
+ new KuduTransaction.SerializationOptions();
+ options.setEnableKeepalive(true);
+ KuduTransaction txn = client.newTransaction();
+ final byte[] buf = txn.serialize(options);
+ final TxnTokenPB pb =
TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+ assertTrue(pb.hasKeepaliveMillis());
+ final long keepaliveMillis = pb.getKeepaliveMillis();
+ assertTrue(keepaliveMillis > 0);
+
+ KuduTransaction serdesTxn = KuduTransaction.deserialize(buf,
asyncClient);
+
+ // Call KuduTransaction.close() explicitly to stop sending automatic
+ // keepalive messages by the 'txn' handle.
+ txn.close();
+
+ // Keep the handle around without any activity for longer than the
+ // keepalive timeout interval.
+ Thread.sleep(3 * keepaliveMillis);
+
+ // At this point, the underlying transaction should be kept open
+ // because the 'serdesTxn' handle sends keepalive heartbeats even if the
+ // original handle ceased to send those after calling 'close()' on it.
+ // As an extra sanity check, call 'commit()' and 'isCommitComplete()'
+ // on both handles to make sure no exception is thrown.
+ serdesTxn.commit(false);
+ serdesTxn.isCommitComplete();
+ txn.commit(false);
+ txn.isCommitComplete();
+ }
}
}