Repository: sqoop Updated Branches: refs/heads/trunk 184452908 -> e13dd2120
SQOOP-3232: Remove Sqoop dependency on deprecated HBase APIs (Szabolcs Vasas via Boglarka Egyed) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e13dd212 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e13dd212 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e13dd212 Branch: refs/heads/trunk Commit: e13dd21209c26316d43350a23f5d533321b61352 Parents: 1844529 Author: Boglarka Egyed <[email protected]> Authored: Mon Sep 11 14:15:46 2017 +0200 Committer: Boglarka Egyed <[email protected]> Committed: Mon Sep 11 14:15:46 2017 +0200 ---------------------------------------------------------------------- .../apache/sqoop/hbase/HBasePutProcessor.java | 99 +++++++++----- .../sqoop/mapreduce/HBaseBulkImportJob.java | 60 ++++++--- .../apache/sqoop/mapreduce/HBaseImportJob.java | 69 +++------- .../com/cloudera/sqoop/hbase/HBaseTestCase.java | 31 +++-- .../sqoop/hbase/TestHBasePutProcessor.java | 133 +++++++++++++++++++ 5 files changed, 282 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index 032fd38..cf97b8a 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -25,8 +25,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; @@ -84,12 +87,19 @@ public class HBasePutProcessor implements Closeable, Configurable, // into a Put command. private PutTransformer putTransformer; - private String tableName; - private HTable table; + private Connection hbaseConnection; + private BufferedMutator bufferedMutator; public HBasePutProcessor() { } + HBasePutProcessor(Configuration conf, PutTransformer putTransformer, Connection hbaseConnection, BufferedMutator bufferedMutator) { + this.conf = conf; + this.putTransformer = putTransformer; + this.hbaseConnection = hbaseConnection; + this.bufferedMutator = bufferedMutator; + } + @Override @SuppressWarnings("unchecked") public void setConf(Configuration config) { @@ -106,15 +116,24 @@ public class HBasePutProcessor implements Closeable, Configurable, throw new RuntimeException("Could not instantiate PutTransformer."); } putTransformer.init(conf); + initHBaseMutator(); + } - this.tableName = conf.get(TABLE_NAME_KEY, null); + private void initHBaseMutator() { + String tableName = conf.get(TABLE_NAME_KEY, null); try { - this.table = new HTable(conf, this.tableName); - } catch (IOException ioe) { - throw new RuntimeException("Could not access HBase table " + tableName, - ioe); + hbaseConnection = ConnectionFactory.createConnection(conf); + bufferedMutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName)); + } catch (IOException e) { + if (hbaseConnection != null) { + try { + hbaseConnection.close(); + } catch (IOException connCloseException){ + LOG.error("Cannot close HBase connection.", connCloseException); + } + } + throw new RuntimeException("Could not create mutator for HBase table " + tableName, e); } - this.table.setAutoFlush(false); } @Override @@ -131,38 +150,54 @@ public class HBasePutProcessor implements Closeable, Configurable, throws IOException, ProcessingException { Map<String, Object> fields = record.getFieldMap(); List<Mutation> mutationList = putTransformer.getMutationCommand(fields); - if (null != mutationList) { - for (Mutation mutation : mutationList) { - if (mutation!=null) { - if(mutation instanceof Put) { - Put putObject = (Put) mutation; - if (putObject.isEmpty()) { - LOG.warn("Could not insert row with no columns " - + "for row-key column: " + Bytes.toString(putObject.getRow())); - } else { - this.table.put(putObject); - } - } else if(mutation instanceof Delete) { - Delete deleteObject = (Delete) mutation; - if (deleteObject.isEmpty()) { - LOG.warn("Could not delete row with no columns " - + "for row-key column: " + Bytes.toString(deleteObject.getRow())); - } else { - this.table.delete(deleteObject); - } - } - } + if (mutationList == null) { + return; + } + for (Mutation mutation : mutationList) { + if (!canAccept(mutation)) { + continue; + } + if (!mutation.isEmpty()) { + bufferedMutator.mutate(mutation); + } else { + logEmptyMutation(mutation); } } } + private void logEmptyMutation(Mutation mutation) { + String action = null; + if (mutation instanceof Put) { + action = "insert"; + } else if (mutation instanceof Delete) { + action = "delete"; + } + LOG.warn("Could not " + action + " row with no columns " + + "for row-key column: " + Bytes.toString(mutation.getRow())); + } + + private boolean canAccept(Mutation mutation) { + return mutation != null && (mutation instanceof Put || mutation instanceof Delete); + } + @Override /** * Closes the HBase table and commits all pending operations. */ public void close() throws IOException { - this.table.flushCommits(); - this.table.close(); + try { + bufferedMutator.flush(); + } finally { + try { + bufferedMutator.close(); + } finally { + try { + hbaseConnection.close(); + } catch (IOException e) { + LOG.error("Cannot close HBase connection.", e); + } + } + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java index 2bbfffe..ed89aeb 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java @@ -26,10 +26,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; @@ -50,6 +54,8 @@ public class HBaseBulkImportJob extends HBaseImportJob { public static final Log LOG = LogFactory.getLog( HBaseBulkImportJob.class.getName()); + private Connection hbaseConnection; + public HBaseBulkImportJob(final SqoopOptions opts, final ImportJobContext importContext) { super(opts, importContext); @@ -81,8 +87,21 @@ public class HBaseBulkImportJob extends HBaseImportJob { TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class); FileOutputFormat.setOutputPath(job, getContext().getDestination()); - HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); - HFileOutputFormat.configureIncrementalLoad(job, hTable); + TableName hbaseTableName = TableName.valueOf(options.getHBaseTable()); + hbaseConnection = ConnectionFactory.createConnection(job.getConfiguration()); + + try ( + Table hbaseTable = hbaseConnection.getTable(hbaseTableName) + ) { + HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName)); + } catch (IOException | RuntimeException e) { + try { + hbaseConnection.close(); + } catch (IOException ioException) { + LOG.error("Cannot close HBase connection.", ioException); + } + throw e; + } } /** @@ -99,15 +118,16 @@ public class HBaseBulkImportJob extends HBaseImportJob { setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), FsPermission.createImmutable((short) 00777)); - HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); + TableName hbaseTableName = TableName.valueOf(options.getHBaseTable()); // Load generated HFiles into table - try { - LoadIncrementalHFiles loader = new LoadIncrementalHFiles( - job.getConfiguration()); - loader.doBulkLoad(bulkLoadDir, hTable); - } - catch (Exception e) { + try ( + Table hbaseTable = hbaseConnection.getTable(hbaseTableName); + Admin hbaseAdmin = hbaseConnection.getAdmin() + ) { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(job.getConfiguration()); + loader.doBulkLoad(bulkLoadDir, hbaseAdmin, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName)); + } catch (Exception e) { String errorMessage = String.format("Unrecoverable error while " + "performing the bulk load of files in [%s]", bulkLoadDir.toString()); @@ -117,11 +137,19 @@ public class HBaseBulkImportJob extends HBaseImportJob { @Override protected void jobTeardown(Job job) throws IOException, ImportException { - super.jobTeardown(job); - // Delete the hfiles directory after we are finished. - Path destination = getContext().getDestination(); - FileSystem fileSystem = destination.getFileSystem(job.getConfiguration()); - fileSystem.delete(destination, true); + try { + super.jobTeardown(job); + // Delete the hfiles directory after we are finished. + Path destination = getContext().getDestination(); + FileSystem fileSystem = destination.getFileSystem(job.getConfiguration()); + fileSystem.delete(destination, true); + } finally { + try { + hbaseConnection.close(); + } catch (IOException e) { + LOG.error("Cannot close HBase connection.", e); + } + } } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java index 523d0a7..5adb788 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java @@ -19,7 +19,6 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.apache.commons.logging.Log; @@ -28,10 +27,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; @@ -160,65 +163,28 @@ public class HBaseImportJob extends DataDrivenImportJob { HBaseConfiguration.addHbaseResources(conf); } - HBaseAdmin admin = new HBaseAdmin(conf); + Connection hbaseConnection = ConnectionFactory.createConnection(conf); + Admin admin = hbaseConnection.getAdmin(); if (!skipDelegationTokens(conf)) { - // Add authentication token to the job if we're running on secure cluster. - // - // We're currently supporting HBase version 0.90 that do not have security - // patches which means that it do not have required methods - // "isSecurityEnabled" and "obtainAuthTokenForJob". - // - // We're using reflection API to see if those methods are available and call - // them only if they are present. - // - // After we will remove support for HBase 0.90 we can simplify the code to - // following code fragment: - /* try { - if (User.isSecurityEnabled()) { - User user = User.getCurrent(); - user.obtainAuthTokenForJob(conf, job); + if (User.isHBaseSecurityEnabled(conf)) { + TokenUtil.obtainTokenForJob(hbaseConnection, User.getCurrent(), job); } } catch(InterruptedException ex) { throw new ImportException("Can't get authentication token", ex); } - */ - try { - // Get method isSecurityEnabled - Method isHBaseSecurityEnabled = User.class.getMethod( - "isHBaseSecurityEnabled", Configuration.class); - - // Get method obtainAuthTokenForJob - Method obtainAuthTokenForJob = User.class.getMethod( - "obtainAuthTokenForJob", Configuration.class, Job.class); - - // Get current user - User user = User.getCurrent(); - - // Obtain security token if needed - if ((Boolean)isHBaseSecurityEnabled.invoke(null, conf)) { - obtainAuthTokenForJob.invoke(user, conf, job); - } - } catch (NoSuchMethodException e) { - LOG.info("It seems that we're running on HBase without security" - + " additions. Security additions will not be used during this job."); - } catch (InvocationTargetException e) { - throw new ImportException("Can't get authentication token", e); - } catch (IllegalAccessException e) { - throw new ImportException("Can't get authentication token", e); - } } // Check to see if the table exists. HTableDescriptor tableDesc = null; byte [] familyBytes = Bytes.toBytes(familyName); HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes); - if (!admin.tableExists(tableName)) { + if (!admin.tableExists(TableName.valueOf(tableName))) { if (options.getCreateHBaseTable()) { // Create the table. LOG.info("Creating missing HBase table " + tableName); - tableDesc = new HTableDescriptor(tableName); + tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); tableDesc.addFamily(colDesc); admin.createTable(tableDesc); } else { @@ -228,16 +194,16 @@ public class HBaseImportJob extends DataDrivenImportJob { } } else { // Table exists, so retrieve their current version - tableDesc = admin.getTableDescriptor(Bytes.toBytes(tableName)); + tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName)); // Check if current version do have specified column family if (!tableDesc.hasFamily(familyBytes)) { if (options.getCreateHBaseTable()) { // Create the column family. LOG.info("Creating missing column family " + familyName); - admin.disableTable(tableName); - admin.addColumn(tableName, colDesc); - admin.enableTable(tableName); + admin.disableTable(TableName.valueOf(tableName)); + admin.addColumn(TableName.valueOf(tableName), colDesc); + admin.enableTable(TableName.valueOf(tableName)); } else { LOG.warn("Could not find column family " + familyName + " in table " + tableName); @@ -250,10 +216,11 @@ public class HBaseImportJob extends DataDrivenImportJob { // Make sure we close the connection to HBA, this is only relevant in // unit tests admin.close(); + hbaseConnection.close(); // Make sure HBase libraries are shipped as part of the job. TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(conf, Table.class); super.jobSetup(job); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java index 8b29b5f..cf42d31 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java @@ -39,8 +39,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; @@ -206,9 +209,10 @@ public abstract class HBaseTestCase extends ImportJobTestCase { String colFamily, String colName, String val) throws IOException { Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName)); - HTable table = new HTable(new Configuration( - hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName)); - try { + try ( + Connection hbaseConnection = createHBaseConnection(); + Table table = getHBaseTable(hbaseConnection, tableName) + ) { Result r = table.get(get); byte [] actualVal = r.getValue(Bytes.toBytes(colFamily), Bytes.toBytes(colName)); @@ -218,29 +222,34 @@ public abstract class HBaseTestCase extends ImportJobTestCase { assertNotNull("No result, but we expected one", actualVal); assertEquals(val, Bytes.toString(actualVal)); } - } finally { - table.close(); } } protected int countHBaseTable(String tableName, String colFamily) throws IOException { int count = 0; - HTable table = new HTable(new Configuration( - hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName)); - try { + try ( + Connection hbaseConnection = createHBaseConnection(); + Table table = getHBaseTable(hbaseConnection, tableName) + ) { ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily)); for(Result result = scanner.next(); result != null; result = scanner.next()) { count++; } - } finally { - table.close(); } return count; } + private Connection createHBaseConnection() throws IOException { + return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration())); + } + + private Table getHBaseTable(Connection connection, String tableName) throws IOException { + return connection.getTable(TableName.valueOf(tableName)); + } + protected boolean isKerberized() { return kerberosConfigurationProvider != null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java new file mode 100644 index 0000000..73b3177 --- /dev/null +++ b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.hbase; + +import com.cloudera.sqoop.lib.FieldMappable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.sqoop.util.ExpectedLogMessage; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class TestHBasePutProcessor { + + @Rule + public ExpectedLogMessage expectedLogMessage = new ExpectedLogMessage(); + + private Configuration configuration; + private Connection hbaseConnection; + private PutTransformer putTransformer; + private BufferedMutator bufferedMutator; + private FieldMappable fieldMappable; + + private HBasePutProcessor hBasePutProcessor; + + @Before + public void before() { + configuration = mock(Configuration.class); + hbaseConnection = mock(Connection.class); + putTransformer = mock(PutTransformer.class); + bufferedMutator = mock(BufferedMutator.class); + fieldMappable = mock(FieldMappable.class); + + hBasePutProcessor = new HBasePutProcessor(configuration, putTransformer, hbaseConnection, bufferedMutator); + } + + @Test + public void testNoMutationIsDoneWhenNullListIsReceived() throws Exception { + when(putTransformer.getMutationCommand(anyMap())).thenReturn(null); + verifyNoMoreInteractions(bufferedMutator); + + hBasePutProcessor.accept(fieldMappable); + } + + @Test + public void testNoMutationIsDoneWhenListContainingNullsIsReceived() throws Exception { + List<Mutation> inputList = Arrays.asList(null, null, null); + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); + verifyNoMoreInteractions(bufferedMutator); + + hBasePutProcessor.accept(fieldMappable); + } + + @Test + public void testNoMutationIsDoneWhenListContainingUnknownMutationIsReceived() throws Exception { + List<Mutation> inputList = singletonList(mock(Mutation.class)); + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); + verifyNoMoreInteractions(bufferedMutator); + + hBasePutProcessor.accept(fieldMappable); + } + + @Test + public void testWarningIsLoggedWhenListContainingEmptyPutIsReceived() throws Exception { + Mutation emptyPutMutation = mock(Put.class); + when(emptyPutMutation.getRow()).thenReturn("emptyPutMutation".getBytes()); + when(emptyPutMutation.isEmpty()).thenReturn(true); + List<Mutation> inputList = singletonList(emptyPutMutation); + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); + verifyNoMoreInteractions(bufferedMutator); + expectedLogMessage.expectWarn("Could not insert row with no columns for row-key column: emptyPutMutation"); + + hBasePutProcessor.accept(fieldMappable); + } + + @Test + public void testWarningIsLoggedWhenListContainingEmptyDeleteIsReceived() throws Exception { + Mutation emptyDeleteMutation = mock(Delete.class); + when(emptyDeleteMutation.getRow()).thenReturn("emptyDeleteMutation".getBytes()); + when(emptyDeleteMutation.isEmpty()).thenReturn(true); + List<Mutation> inputList = singletonList(emptyDeleteMutation); + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); + verifyNoMoreInteractions(bufferedMutator); + expectedLogMessage.expectWarn("Could not delete row with no columns for row-key column: emptyDeleteMutation"); + + hBasePutProcessor.accept(fieldMappable); + } + + @Test + public void testMutationIsDoneForAllElementsWhenListContainingValidMutationsIsReceived() throws Exception { + Mutation aPutMutation = mock(Put.class); + Mutation aDeleteMutation = mock(Delete.class); + Mutation anotherPutMutation = mock(Put.class); + List<Mutation> inputList = Arrays.asList(aPutMutation, aDeleteMutation, anotherPutMutation); + when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList); + + hBasePutProcessor.accept(fieldMappable); + + verify(bufferedMutator).mutate(aPutMutation); + verify(bufferedMutator).mutate(aDeleteMutation); + verify(bufferedMutator).mutate(anotherPutMutation); + } + +} \ No newline at end of file
