Repository: hive Updated Branches: refs/heads/master 17f759d63 -> 3301b92bc
HIVE-11228 - Mutation API should use semi-shared locks. (Elliot West, via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3301b92b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3301b92b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3301b92b Branch: refs/heads/master Commit: 3301b92bcb2a1f779e76d174cd9ac6d83fc66938 Parents: 17f759d Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Mon Jul 13 09:42:07 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Mon Jul 13 09:42:26 2015 -0700 ---------------------------------------------------------------------- .../streaming/mutate/client/MutatorClient.java | 11 +- .../streaming/mutate/client/lock/Lock.java | 73 +++++++---- .../hive/hcatalog/streaming/mutate/package.html | 8 +- .../streaming/mutate/client/lock/TestLock.java | 121 ++++++++++++------- 4 files changed, 136 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java index 2724525..29b828d 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java @@ -42,7 +42,16 @@ public class MutatorClient implements Closeable { .lockFailureListener(lockFailureListener == null ? LockFailureListener.NULL_LISTENER : lockFailureListener) .user(user); for (AcidTable table : tables) { - lockOptions.addTable(table.getDatabaseName(), table.getTableName()); + switch (table.getTableType()) { + case SOURCE: + lockOptions.addSourceTable(table.getDatabaseName(), table.getTableName()); + break; + case SINK: + lockOptions.addSinkTable(table.getDatabaseName(), table.getTableName()); + break; + default: + throw new IllegalArgumentException("Unknown TableType: " + table.getTableType()); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java index 21604df..ad0b303 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java @@ -2,6 +2,7 @@ package org.apache.hive.hcatalog.streaming.mutate.client.lock; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -35,7 +36,8 @@ public class Lock { private final IMetaStoreClient metaStoreClient; private final HeartbeatFactory heartbeatFactory; private final LockFailureListener listener; - private final Collection<Table> tableDescriptors; + private final Collection<Table> sinks; + private final Collection<Table> tables = new HashSet<>(); private final int lockRetries; private final int retryWaitSeconds; private final String user; @@ -46,23 +48,26 @@ public class Lock { private Long transactionId; public Lock(IMetaStoreClient metaStoreClient, Options options) { - this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user, - options.descriptors, options.lockRetries, options.retryWaitSeconds); + this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user, options.sources, + options.sinks, options.lockRetries, options.retryWaitSeconds); } /** Visible for testing only. */ Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, HiveConf hiveConf, - LockFailureListener listener, String user, Collection<Table> tableDescriptors, int lockRetries, + LockFailureListener listener, String user, Collection<Table> sources, Collection<Table> sinks, int lockRetries, int retryWaitSeconds) { this.metaStoreClient = metaStoreClient; this.heartbeatFactory = heartbeatFactory; this.hiveConf = hiveConf; this.user = user; - this.tableDescriptors = tableDescriptors; this.listener = listener; this.lockRetries = lockRetries; this.retryWaitSeconds = retryWaitSeconds; + this.sinks = sinks; + tables.addAll(sources); + tables.addAll(sinks); + if (LockFailureListener.NULL_LISTENER.equals(listener)) { LOG.warn("No {} supplied. Data quality and availability cannot be assured.", LockFailureListener.class.getSimpleName()); @@ -77,6 +82,9 @@ public class Lock { /** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */ public void acquire(long transactionId) throws LockException { + if (transactionId <= 0) { + throw new IllegalArgumentException("Invalid transaction id: " + transactionId); + } lockId = internalAcquire(transactionId); this.transactionId = transactionId; initiateHeartbeat(); @@ -96,19 +104,18 @@ public class Lock { @Override public String toString() { - return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId - + "]"; + return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId + "]"; } private long internalAcquire(Long transactionId) throws LockException { int attempts = 0; - LockRequest request = buildSharedLockRequest(transactionId); + LockRequest request = buildLockRequest(transactionId); do { LockResponse response = null; try { response = metaStoreClient.lock(request); } catch (TException e) { - throw new LockException("Unable to acquire lock for tables: [" + join(tableDescriptors) + "]", e); + throw new LockException("Unable to acquire lock for tables: [" + join(tables) + "]", e); } if (response != null) { LockState state = response.getState(); @@ -129,7 +136,7 @@ public class Lock { } attempts++; } while (attempts < lockRetries); - throw new LockException("Could not acquire lock on tables: [" + join(tableDescriptors) + "]"); + throw new LockException("Could not acquire lock on tables: [" + join(tables) + "]"); } private void internalRelease() { @@ -142,18 +149,24 @@ public class Lock { } } catch (TException e) { LOG.error("Lock " + lockId + " failed.", e); - listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), e); + listener.lockFailed(lockId, transactionId, asStrings(tables), e); } } - private LockRequest buildSharedLockRequest(Long transactionId) { + private LockRequest buildLockRequest(Long transactionId) { + if (transactionId == null && !sinks.isEmpty()) { + throw new IllegalArgumentException("Cannot sink to tables outside of a transaction: sinks=" + asStrings(sinks)); + } LockRequestBuilder requestBuilder = new LockRequestBuilder(); - for (Table descriptor : tableDescriptors) { - LockComponent component = new LockComponentBuilder() - .setDbName(descriptor.getDbName()) - .setTableName(descriptor.getTableName()) - .setShared() - .build(); + for (Table table : tables) { + LockComponentBuilder componentBuilder = new LockComponentBuilder().setDbName(table.getDbName()).setTableName( + table.getTableName()); + if (sinks.contains(table)) { + componentBuilder.setSemiShared(); + } else { + componentBuilder.setShared(); + } + LockComponent component = componentBuilder.build(); requestBuilder.addLockComponent(component); } if (transactionId != null) { @@ -166,8 +179,7 @@ public class Lock { private void initiateHeartbeat() { int heartbeatPeriod = getHeartbeatPeriod(); LOG.debug("Heartbeat period {}s", heartbeatPeriod); - heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tableDescriptors, lockId, - heartbeatPeriod); + heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tables, lockId, heartbeatPeriod); } private int getHeartbeatPeriod() { @@ -210,22 +222,33 @@ public class Lock { /** Constructs a lock options for a set of Hive ACID tables from which we wish to read. */ public static final class Options { - Set<Table> descriptors = new LinkedHashSet<>(); + Set<Table> sources = new LinkedHashSet<>(); + Set<Table> sinks = new LinkedHashSet<>(); LockFailureListener listener = LockFailureListener.NULL_LISTENER; int lockRetries = 5; int retryWaitSeconds = 30; String user; HiveConf hiveConf; - /** Adds a table for which a shared read lock will be requested. */ - public Options addTable(String databaseName, String tableName) { + /** Adds a table for which a shared lock will be requested. */ + public Options addSourceTable(String databaseName, String tableName) { + addTable(databaseName, tableName, sources); + return this; + } + + /** Adds a table for which a semi-shared lock will be requested. */ + public Options addSinkTable(String databaseName, String tableName) { + addTable(databaseName, tableName, sinks); + return this; + } + + private void addTable(String databaseName, String tableName, Set<Table> tables) { checkNotNullOrEmpty(databaseName); checkNotNullOrEmpty(tableName); Table table = new Table(); table.setDbName(databaseName); table.setTableName(tableName); - descriptors.add(table); - return this; + tables.add(table); } public Options user(String user) { http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html index 9fc10b6..09a55b6 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html @@ -421,7 +421,7 @@ automatically (say on a hourly basis). In such cases requiring the Hive admin to pre-create the necessary partitions may not be reasonable. Consequently the API allows coordinators to create partitions as needed (see: -<code>MutatorClientBuilder.addTable(String, String, boolean)</code> +<code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code> ). Partition creation being an atomic action, multiple coordinators can race to create the partition, but only one would succeed, so coordinators clients need not synchronize when creating a partition. The @@ -440,14 +440,14 @@ consistent manner requires the following: <ol> <li>Obtaining a valid transaction list from the meta store (<code>ValidTxnList</code>). </li> -<li>Acquiring a read-lock with the meta store and issuing -heartbeats (<code>LockImpl</code> can help with this). +<li>Acquiring a lock with the meta store and issuing heartbeats (<code>LockImpl</code> +can help with this). </li> <li>Configuring the <code>OrcInputFormat</code> and then reading the data. Make sure that you also pull in the <code>ROW__ID</code> values. See: <code>AcidRecordReader.getRecordIdentifier</code>. </li> -<li>Releasing the read-lock.</li> +<li>Releasing the lock.</li> </ol> </p> http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java index ef1e80c..05f342b 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java @@ -19,7 +19,9 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.Timer; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,14 +44,17 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; @RunWith(MockitoJUnitRunner.class) public class TestLock { - private static final Table TABLE_1 = createTable("DB", "ONE"); - private static final Table TABLE_2 = createTable("DB", "TWO"); - private static final List<Table> TABLES = ImmutableList.of(TABLE_1, TABLE_2); + private static final Table SOURCE_TABLE_1 = createTable("DB", "SOURCE_1"); + private static final Table SOURCE_TABLE_2 = createTable("DB", "SOURCE_2"); + private static final Table SINK_TABLE = createTable("DB", "SINK"); + private static final Set<Table> SOURCES = ImmutableSet.of(SOURCE_TABLE_1, SOURCE_TABLE_2); + private static final Set<Table> SINKS = ImmutableSet.of(SINK_TABLE); + private static final Set<Table> TABLES = ImmutableSet.of(SOURCE_TABLE_1, SOURCE_TABLE_2, SINK_TABLE); private static final long LOCK_ID = 42; private static final long TRANSACTION_ID = 109; private static final String USER = "ewest"; @@ -67,7 +72,8 @@ public class TestLock { @Captor private ArgumentCaptor<LockRequest> requestCaptor; - private Lock lock; + private Lock readLock; + private Lock writeLock; private HiveConf configuration = new HiveConf(); @Before @@ -79,44 +85,57 @@ public class TestLock { mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), any(LockFailureListener.class), any(Long.class), any(Collection.class), anyLong(), anyInt())).thenReturn(mockHeartbeat); - lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, TABLES, 3, 0); + readLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, SOURCES, + Collections.<Table> emptySet(), 3, 0); + writeLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, SOURCES, SINKS, + 3, 0); } @Test public void testAcquireReadLockWithNoIssues() throws Exception { - lock.acquire(); - assertEquals(Long.valueOf(LOCK_ID), lock.getLockId()); - assertNull(lock.getTransactionId()); + readLock.acquire(); + assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId()); + assertNull(readLock.getTransactionId()); + } + + @Test(expected = IllegalArgumentException.class) + public void testAcquireWriteLockWithoutTxn() throws Exception { + writeLock.acquire(); + } + + @Test(expected = IllegalArgumentException.class) + public void testAcquireWriteLockWithInvalidTxn() throws Exception { + writeLock.acquire(0); } @Test public void testAcquireTxnLockWithNoIssues() throws Exception { - lock.acquire(TRANSACTION_ID); - assertEquals(Long.valueOf(LOCK_ID), lock.getLockId()); - assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId()); + writeLock.acquire(TRANSACTION_ID); + assertEquals(Long.valueOf(LOCK_ID), writeLock.getLockId()); + assertEquals(Long.valueOf(TRANSACTION_ID), writeLock.getTransactionId()); } @Test public void testAcquireReadLockCheckHeartbeatCreated() throws Exception { configuration.set("hive.txn.timeout", "100s"); - lock.acquire(); + readLock.acquire(); - verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(TABLES), + verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(SOURCES), eq(LOCK_ID), eq(75)); } @Test public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception { configuration.set("hive.txn.timeout", "100s"); - lock.acquire(TRANSACTION_ID); + writeLock.acquire(TRANSACTION_ID); - verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), eq(TABLES), - eq(LOCK_ID), eq(75)); + verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), + eq(TABLES), eq(LOCK_ID), eq(75)); } @Test public void testAcquireLockCheckUser() throws Exception { - lock.acquire(); + readLock.acquire(); verify(mockMetaStoreClient).lock(requestCaptor.capture()); LockRequest actualRequest = requestCaptor.getValue(); assertEquals(USER, actualRequest.getUser()); @@ -124,7 +143,7 @@ public class TestLock { @Test public void testAcquireReadLockCheckLocks() throws Exception { - lock.acquire(); + readLock.acquire(); verify(mockMetaStoreClient).lock(requestCaptor.capture()); LockRequest request = requestCaptor.getValue(); @@ -137,17 +156,17 @@ public class TestLock { assertEquals(2, components.size()); LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); - expected1.setTablename("ONE"); + expected1.setTablename("SOURCE_1"); assertTrue(components.contains(expected1)); LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); - expected2.setTablename("TWO"); + expected2.setTablename("SOURCE_2"); assertTrue(components.contains(expected2)); } @Test public void testAcquireTxnLockCheckLocks() throws Exception { - lock.acquire(TRANSACTION_ID); + writeLock.acquire(TRANSACTION_ID); verify(mockMetaStoreClient).lock(requestCaptor.capture()); LockRequest request = requestCaptor.getValue(); @@ -157,73 +176,77 @@ public class TestLock { List<LockComponent> components = request.getComponent(); - System.out.println(components); - assertEquals(2, components.size()); + assertEquals(3, components.size()); LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); - expected1.setTablename("ONE"); + expected1.setTablename("SOURCE_1"); assertTrue(components.contains(expected1)); LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); - expected2.setTablename("TWO"); + expected2.setTablename("SOURCE_2"); assertTrue(components.contains(expected2)); + + LockComponent expected3 = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "DB"); + expected3.setTablename("SINK"); + assertTrue(components.contains(expected3)); } @Test(expected = LockException.class) public void testAcquireLockNotAcquired() throws Exception { when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED); - lock.acquire(); + readLock.acquire(); } @Test(expected = LockException.class) public void testAcquireLockAborted() throws Exception { when(mockLockResponse.getState()).thenReturn(ABORT); - lock.acquire(); + readLock.acquire(); } @Test(expected = LockException.class) public void testAcquireLockWithWaitRetriesExceeded() throws Exception { when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING); - lock.acquire(); + readLock.acquire(); } @Test public void testAcquireLockWithWaitRetries() throws Exception { when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED); - lock.acquire(); - assertEquals(Long.valueOf(LOCK_ID), lock.getLockId()); + readLock.acquire(); + assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId()); } @Test public void testReleaseLock() throws Exception { - lock.acquire(); - lock.release(); + readLock.acquire(); + readLock.release(); verify(mockMetaStoreClient).unlock(LOCK_ID); } @Test public void testReleaseLockNoLock() throws Exception { - lock.release(); + readLock.release(); verifyNoMoreInteractions(mockMetaStoreClient); } @Test public void testReleaseLockCancelsHeartbeat() throws Exception { - lock.acquire(); - lock.release(); + readLock.acquire(); + readLock.release(); verify(mockHeartbeat).cancel(); } @Test public void testReadHeartbeat() throws Exception { - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, SOURCES, LOCK_ID); task.run(); verify(mockMetaStoreClient).heartbeat(0, LOCK_ID); } @Test public void testTxnHeartbeat() throws Exception { - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES, + LOCK_ID); task.run(); verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); } @@ -232,43 +255,47 @@ public class TestLock { public void testReadHeartbeatFailsNoSuchLockException() throws Exception { Throwable t = new NoSuchLockException(); doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID); - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, SOURCES, LOCK_ID); task.run(); - verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t); + verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(SOURCES), t); } @Test public void testTxnHeartbeatFailsNoSuchLockException() throws Exception { Throwable t = new NoSuchLockException(); doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES, + LOCK_ID); task.run(); - verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t); + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t); } @Test public void testHeartbeatFailsNoSuchTxnException() throws Exception { Throwable t = new NoSuchTxnException(); doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES, + LOCK_ID); task.run(); - verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t); + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t); } @Test public void testHeartbeatFailsTxnAbortedException() throws Exception { Throwable t = new TxnAbortedException(); doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES, + LOCK_ID); task.run(); - verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t); + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t); } @Test public void testHeartbeatContinuesTException() throws Exception { Throwable t = new TException(); doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID); - HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES, + LOCK_ID); task.run(); verifyZeroInteractions(mockListener); }