Repository: hive
Updated Branches:
  refs/heads/master 17f759d63 -> 3301b92bc


HIVE-11228 - Mutation API should use semi-shared locks. (Elliot West, via 
Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3301b92b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3301b92b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3301b92b

Branch: refs/heads/master
Commit: 3301b92bcb2a1f779e76d174cd9ac6d83fc66938
Parents: 17f759d
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Mon Jul 13 09:42:07 2015 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Mon Jul 13 09:42:26 2015 -0700

----------------------------------------------------------------------
 .../streaming/mutate/client/MutatorClient.java  |  11 +-
 .../streaming/mutate/client/lock/Lock.java      |  73 +++++++----
 .../hive/hcatalog/streaming/mutate/package.html |   8 +-
 .../streaming/mutate/client/lock/TestLock.java  | 121 ++++++++++++-------
 4 files changed, 136 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
index 2724525..29b828d 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
@@ -42,7 +42,16 @@ public class MutatorClient implements Closeable {
         .lockFailureListener(lockFailureListener == null ? 
LockFailureListener.NULL_LISTENER : lockFailureListener)
         .user(user);
     for (AcidTable table : tables) {
-      lockOptions.addTable(table.getDatabaseName(), table.getTableName());
+      switch (table.getTableType()) {
+      case SOURCE:
+        lockOptions.addSourceTable(table.getDatabaseName(), 
table.getTableName());
+        break;
+      case SINK:
+        lockOptions.addSinkTable(table.getDatabaseName(), 
table.getTableName());
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown TableType: " + 
table.getTableType());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
index 21604df..ad0b303 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
@@ -2,6 +2,7 @@ package org.apache.hive.hcatalog.streaming.mutate.client.lock;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -35,7 +36,8 @@ public class Lock {
   private final IMetaStoreClient metaStoreClient;
   private final HeartbeatFactory heartbeatFactory;
   private final LockFailureListener listener;
-  private final Collection<Table> tableDescriptors;
+  private final Collection<Table> sinks;
+  private final Collection<Table> tables = new HashSet<>();
   private final int lockRetries;
   private final int retryWaitSeconds;
   private final String user;
@@ -46,23 +48,26 @@ public class Lock {
   private Long transactionId;
 
   public Lock(IMetaStoreClient metaStoreClient, Options options) {
-    this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, 
options.listener, options.user,
-        options.descriptors, options.lockRetries, options.retryWaitSeconds);
+    this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, 
options.listener, options.user, options.sources,
+        options.sinks, options.lockRetries, options.retryWaitSeconds);
   }
 
   /** Visible for testing only. */
   Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, 
HiveConf hiveConf,
-      LockFailureListener listener, String user, Collection<Table> 
tableDescriptors, int lockRetries,
+      LockFailureListener listener, String user, Collection<Table> sources, 
Collection<Table> sinks, int lockRetries,
       int retryWaitSeconds) {
     this.metaStoreClient = metaStoreClient;
     this.heartbeatFactory = heartbeatFactory;
     this.hiveConf = hiveConf;
     this.user = user;
-    this.tableDescriptors = tableDescriptors;
     this.listener = listener;
     this.lockRetries = lockRetries;
     this.retryWaitSeconds = retryWaitSeconds;
 
+    this.sinks = sinks;
+    tables.addAll(sources);
+    tables.addAll(sinks);
+
     if (LockFailureListener.NULL_LISTENER.equals(listener)) {
       LOG.warn("No {} supplied. Data quality and availability cannot be 
assured.",
           LockFailureListener.class.getSimpleName());
@@ -77,6 +82,9 @@ public class Lock {
 
   /** Attempts to acquire a read lock on the table, returns if successful, 
throws exception otherwise. */
   public void acquire(long transactionId) throws LockException {
+    if (transactionId <= 0) {
+      throw new IllegalArgumentException("Invalid transaction id: " + 
transactionId);
+    }
     lockId = internalAcquire(transactionId);
     this.transactionId = transactionId;
     initiateHeartbeat();
@@ -96,19 +104,18 @@ public class Lock {
 
   @Override
   public String toString() {
-    return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + 
", transactionId=" + transactionId
-        + "]";
+    return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + 
", transactionId=" + transactionId + "]";
   }
 
   private long internalAcquire(Long transactionId) throws LockException {
     int attempts = 0;
-    LockRequest request = buildSharedLockRequest(transactionId);
+    LockRequest request = buildLockRequest(transactionId);
     do {
       LockResponse response = null;
       try {
         response = metaStoreClient.lock(request);
       } catch (TException e) {
-        throw new LockException("Unable to acquire lock for tables: [" + 
join(tableDescriptors) + "]", e);
+        throw new LockException("Unable to acquire lock for tables: [" + 
join(tables) + "]", e);
       }
       if (response != null) {
         LockState state = response.getState();
@@ -129,7 +136,7 @@ public class Lock {
       }
       attempts++;
     } while (attempts < lockRetries);
-    throw new LockException("Could not acquire lock on tables: [" + 
join(tableDescriptors) + "]");
+    throw new LockException("Could not acquire lock on tables: [" + 
join(tables) + "]");
   }
 
   private void internalRelease() {
@@ -142,18 +149,24 @@ public class Lock {
       }
     } catch (TException e) {
       LOG.error("Lock " + lockId + " failed.", e);
-      listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), 
e);
+      listener.lockFailed(lockId, transactionId, asStrings(tables), e);
     }
   }
 
-  private LockRequest buildSharedLockRequest(Long transactionId) {
+  private LockRequest buildLockRequest(Long transactionId) {
+    if (transactionId == null && !sinks.isEmpty()) {
+      throw new IllegalArgumentException("Cannot sink to tables outside of a 
transaction: sinks=" + asStrings(sinks));
+    }
     LockRequestBuilder requestBuilder = new LockRequestBuilder();
-    for (Table descriptor : tableDescriptors) {
-      LockComponent component = new LockComponentBuilder()
-          .setDbName(descriptor.getDbName())
-          .setTableName(descriptor.getTableName())
-          .setShared()
-          .build();
+    for (Table table : tables) {
+      LockComponentBuilder componentBuilder = new 
LockComponentBuilder().setDbName(table.getDbName()).setTableName(
+          table.getTableName());
+      if (sinks.contains(table)) {
+        componentBuilder.setSemiShared();
+      } else {
+        componentBuilder.setShared();
+      }
+      LockComponent component = componentBuilder.build();
       requestBuilder.addLockComponent(component);
     }
     if (transactionId != null) {
@@ -166,8 +179,7 @@ public class Lock {
   private void initiateHeartbeat() {
     int heartbeatPeriod = getHeartbeatPeriod();
     LOG.debug("Heartbeat period {}s", heartbeatPeriod);
-    heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, 
transactionId, tableDescriptors, lockId,
-        heartbeatPeriod);
+    heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, 
transactionId, tables, lockId, heartbeatPeriod);
   }
 
   private int getHeartbeatPeriod() {
@@ -210,22 +222,33 @@ public class Lock {
 
   /** Constructs a lock options for a set of Hive ACID tables from which we 
wish to read. */
   public static final class Options {
-    Set<Table> descriptors = new LinkedHashSet<>();
+    Set<Table> sources = new LinkedHashSet<>();
+    Set<Table> sinks = new LinkedHashSet<>();
     LockFailureListener listener = LockFailureListener.NULL_LISTENER;
     int lockRetries = 5;
     int retryWaitSeconds = 30;
     String user;
     HiveConf hiveConf;
 
-    /** Adds a table for which a shared read lock will be requested. */
-    public Options addTable(String databaseName, String tableName) {
+    /** Adds a table for which a shared lock will be requested. */
+    public Options addSourceTable(String databaseName, String tableName) {
+      addTable(databaseName, tableName, sources);
+      return this;
+    }
+
+    /** Adds a table for which a semi-shared lock will be requested. */
+    public Options addSinkTable(String databaseName, String tableName) {
+      addTable(databaseName, tableName, sinks);
+      return this;
+    }
+
+    private void addTable(String databaseName, String tableName, Set<Table> 
tables) {
       checkNotNullOrEmpty(databaseName);
       checkNotNullOrEmpty(tableName);
       Table table = new Table();
       table.setDbName(databaseName);
       table.setTableName(tableName);
-      descriptors.add(table);
-      return this;
+      tables.add(table);
     }
 
     public Options user(String user) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
index 9fc10b6..09a55b6 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -421,7 +421,7 @@ automatically (say on a hourly basis). In such cases 
requiring the Hive
 admin to pre-create the necessary partitions may not be reasonable.
 Consequently the API allows coordinators to create partitions as needed
 (see:
-<code>MutatorClientBuilder.addTable(String, String, boolean)</code>
+<code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code>
 ). Partition creation being an atomic action, multiple coordinators can
 race to create the partition, but only one would succeed, so
 coordinators clients need not synchronize when creating a partition. The
@@ -440,14 +440,14 @@ consistent manner requires the following:
 <ol>
 <li>Obtaining a valid transaction list from the meta store 
(<code>ValidTxnList</code>).
 </li>
-<li>Acquiring a read-lock with the meta store and issuing
-heartbeats (<code>LockImpl</code> can help with this).
+<li>Acquiring a lock with the meta store and issuing heartbeats 
(<code>LockImpl</code>
+can help with this).
 </li>
 <li>Configuring the <code>OrcInputFormat</code> and then reading
 the data. Make sure that you also pull in the <code>ROW__ID</code>
 values. See: <code>AcidRecordReader.getRecordIdentifier</code>.
 </li>
-<li>Releasing the read-lock.</li>
+<li>Releasing the lock.</li>
 </ol>
 </p>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
index ef1e80c..05f342b 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
@@ -19,7 +19,9 @@ import static org.mockito.Mockito.when;
 
 import java.net.InetAddress;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.Timer;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,14 +44,17 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestLock {
 
-  private static final Table TABLE_1 = createTable("DB", "ONE");
-  private static final Table TABLE_2 = createTable("DB", "TWO");
-  private static final List<Table> TABLES = ImmutableList.of(TABLE_1, TABLE_2);
+  private static final Table SOURCE_TABLE_1 = createTable("DB", "SOURCE_1");
+  private static final Table SOURCE_TABLE_2 = createTable("DB", "SOURCE_2");
+  private static final Table SINK_TABLE = createTable("DB", "SINK");
+  private static final Set<Table> SOURCES = ImmutableSet.of(SOURCE_TABLE_1, 
SOURCE_TABLE_2);
+  private static final Set<Table> SINKS = ImmutableSet.of(SINK_TABLE);
+  private static final Set<Table> TABLES = ImmutableSet.of(SOURCE_TABLE_1, 
SOURCE_TABLE_2, SINK_TABLE);
   private static final long LOCK_ID = 42;
   private static final long TRANSACTION_ID = 109;
   private static final String USER = "ewest";
@@ -67,7 +72,8 @@ public class TestLock {
   @Captor
   private ArgumentCaptor<LockRequest> requestCaptor;
 
-  private Lock lock;
+  private Lock readLock;
+  private Lock writeLock;
   private HiveConf configuration = new HiveConf();
 
   @Before
@@ -79,44 +85,57 @@ public class TestLock {
         mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), 
any(LockFailureListener.class), any(Long.class),
             any(Collection.class), anyLong(), 
anyInt())).thenReturn(mockHeartbeat);
 
-    lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, 
mockListener, USER, TABLES, 3, 0);
+    readLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, 
configuration, mockListener, USER, SOURCES,
+        Collections.<Table> emptySet(), 3, 0);
+    writeLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, 
configuration, mockListener, USER, SOURCES, SINKS,
+        3, 0);
   }
 
   @Test
   public void testAcquireReadLockWithNoIssues() throws Exception {
-    lock.acquire();
-    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
-    assertNull(lock.getTransactionId());
+    readLock.acquire();
+    assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId());
+    assertNull(readLock.getTransactionId());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testAcquireWriteLockWithoutTxn() throws Exception {
+    writeLock.acquire();
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testAcquireWriteLockWithInvalidTxn() throws Exception {
+    writeLock.acquire(0);
   }
 
   @Test
   public void testAcquireTxnLockWithNoIssues() throws Exception {
-    lock.acquire(TRANSACTION_ID);
-    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
-    assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId());
+    writeLock.acquire(TRANSACTION_ID);
+    assertEquals(Long.valueOf(LOCK_ID), writeLock.getLockId());
+    assertEquals(Long.valueOf(TRANSACTION_ID), writeLock.getTransactionId());
   }
 
   @Test
   public void testAcquireReadLockCheckHeartbeatCreated() throws Exception {
     configuration.set("hive.txn.timeout", "100s");
-    lock.acquire();
+    readLock.acquire();
 
-    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), 
eq(mockListener), any(Long.class), eq(TABLES),
+    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), 
eq(mockListener), any(Long.class), eq(SOURCES),
         eq(LOCK_ID), eq(75));
   }
 
   @Test
   public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception {
     configuration.set("hive.txn.timeout", "100s");
-    lock.acquire(TRANSACTION_ID);
+    writeLock.acquire(TRANSACTION_ID);
 
-    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), 
eq(mockListener), eq(TRANSACTION_ID), eq(TABLES),
-        eq(LOCK_ID), eq(75));
+    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), 
eq(mockListener), eq(TRANSACTION_ID),
+        eq(TABLES), eq(LOCK_ID), eq(75));
   }
 
   @Test
   public void testAcquireLockCheckUser() throws Exception {
-    lock.acquire();
+    readLock.acquire();
     verify(mockMetaStoreClient).lock(requestCaptor.capture());
     LockRequest actualRequest = requestCaptor.getValue();
     assertEquals(USER, actualRequest.getUser());
@@ -124,7 +143,7 @@ public class TestLock {
 
   @Test
   public void testAcquireReadLockCheckLocks() throws Exception {
-    lock.acquire();
+    readLock.acquire();
     verify(mockMetaStoreClient).lock(requestCaptor.capture());
 
     LockRequest request = requestCaptor.getValue();
@@ -137,17 +156,17 @@ public class TestLock {
     assertEquals(2, components.size());
 
     LockComponent expected1 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
-    expected1.setTablename("ONE");
+    expected1.setTablename("SOURCE_1");
     assertTrue(components.contains(expected1));
 
     LockComponent expected2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
-    expected2.setTablename("TWO");
+    expected2.setTablename("SOURCE_2");
     assertTrue(components.contains(expected2));
   }
 
   @Test
   public void testAcquireTxnLockCheckLocks() throws Exception {
-    lock.acquire(TRANSACTION_ID);
+    writeLock.acquire(TRANSACTION_ID);
     verify(mockMetaStoreClient).lock(requestCaptor.capture());
 
     LockRequest request = requestCaptor.getValue();
@@ -157,73 +176,77 @@ public class TestLock {
 
     List<LockComponent> components = request.getComponent();
 
-    System.out.println(components);
-    assertEquals(2, components.size());
+    assertEquals(3, components.size());
 
     LockComponent expected1 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
-    expected1.setTablename("ONE");
+    expected1.setTablename("SOURCE_1");
     assertTrue(components.contains(expected1));
 
     LockComponent expected2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
-    expected2.setTablename("TWO");
+    expected2.setTablename("SOURCE_2");
     assertTrue(components.contains(expected2));
+
+    LockComponent expected3 = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "DB");
+    expected3.setTablename("SINK");
+    assertTrue(components.contains(expected3));
   }
 
   @Test(expected = LockException.class)
   public void testAcquireLockNotAcquired() throws Exception {
     when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED);
-    lock.acquire();
+    readLock.acquire();
   }
 
   @Test(expected = LockException.class)
   public void testAcquireLockAborted() throws Exception {
     when(mockLockResponse.getState()).thenReturn(ABORT);
-    lock.acquire();
+    readLock.acquire();
   }
 
   @Test(expected = LockException.class)
   public void testAcquireLockWithWaitRetriesExceeded() throws Exception {
     when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING);
-    lock.acquire();
+    readLock.acquire();
   }
 
   @Test
   public void testAcquireLockWithWaitRetries() throws Exception {
     when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED);
-    lock.acquire();
-    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+    readLock.acquire();
+    assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId());
   }
 
   @Test
   public void testReleaseLock() throws Exception {
-    lock.acquire();
-    lock.release();
+    readLock.acquire();
+    readLock.release();
     verify(mockMetaStoreClient).unlock(LOCK_ID);
   }
 
   @Test
   public void testReleaseLockNoLock() throws Exception {
-    lock.release();
+    readLock.release();
     verifyNoMoreInteractions(mockMetaStoreClient);
   }
 
   @Test
   public void testReleaseLockCancelsHeartbeat() throws Exception {
-    lock.acquire();
-    lock.release();
+    readLock.acquire();
+    readLock.release();
     verify(mockHeartbeat).cancel();
   }
 
   @Test
   public void testReadHeartbeat() throws Exception {
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, null, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, null, SOURCES, LOCK_ID);
     task.run();
     verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
   }
 
   @Test
   public void testTxnHeartbeat() throws Exception {
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, SOURCES,
+        LOCK_ID);
     task.run();
     verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
   }
@@ -232,43 +255,47 @@ public class TestLock {
   public void testReadHeartbeatFailsNoSuchLockException() throws Exception {
     Throwable t = new NoSuchLockException();
     doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, null, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, null, SOURCES, LOCK_ID);
     task.run();
-    verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t);
+    verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(SOURCES), t);
   }
 
   @Test
   public void testTxnHeartbeatFailsNoSuchLockException() throws Exception {
     Throwable t = new NoSuchLockException();
     doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, SOURCES,
+        LOCK_ID);
     task.run();
-    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(TABLES), t);
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(SOURCES), t);
   }
 
   @Test
   public void testHeartbeatFailsNoSuchTxnException() throws Exception {
     Throwable t = new NoSuchTxnException();
     doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, SOURCES,
+        LOCK_ID);
     task.run();
-    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(TABLES), t);
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(SOURCES), t);
   }
 
   @Test
   public void testHeartbeatFailsTxnAbortedException() throws Exception {
     Throwable t = new TxnAbortedException();
     doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, SOURCES,
+        LOCK_ID);
     task.run();
-    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(TABLES), t);
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(SOURCES), t);
   }
 
   @Test
   public void testHeartbeatContinuesTException() throws Exception {
     Throwable t = new TException();
     doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
-    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, SOURCES,
+        LOCK_ID);
     task.run();
     verifyZeroInteractions(mockListener);
   }

Reply via email to