SQOOP-3149: Sqoop incremental import - NULL column updates are not pulled into HBase table
(Jilani Shaik via Anna Szonyi) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4ab7b60c Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4ab7b60c Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4ab7b60c Branch: refs/heads/branch-1.4.7 Commit: 4ab7b60caf2c3d9fda72c85e6427912584986970 Parents: dffb2f9 Author: Anna Szonyi <[email protected]> Authored: Thu Aug 3 15:25:36 2017 +0200 Committer: Attila Szabo <[email protected]> Committed: Mon Oct 30 14:46:33 2017 +0100 ---------------------------------------------------------------------- .../apache/sqoop/hbase/HBasePutProcessor.java | 34 +++-- .../org/apache/sqoop/hbase/PutTransformer.java | 5 +- .../sqoop/hbase/ToStringPutTransformer.java | 31 +++-- .../sqoop/mapreduce/HBaseBulkImportMapper.java | 10 +- .../cloudera/sqoop/hbase/HBaseImportTest.java | 54 ++++++++ .../com/cloudera/sqoop/hbase/HBaseTestCase.java | 58 +++++--- .../sqoop/testutil/BaseSqoopTestCase.java | 135 ++++++++++++++++++- 7 files changed, 282 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index fdbe127..032fd38 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -25,7 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ReflectionUtils; @@ -128,17 +130,27 @@ public class HBasePutProcessor implements Closeable, Configurable, public void accept(FieldMappable record) throws IOException, ProcessingException { Map<String, Object> fields = record.getFieldMap(); - - List<Put> putList = putTransformer.getPutCommand(fields); - if (null != putList) { - for (Put put : putList) { - if (put!=null) { - if (put.isEmpty()) { - LOG.warn("Could not insert row with no columns " - + "for row-key column: " + Bytes.toString(put.getRow())); - } else { - this.table.put(put); - } + List<Mutation> mutationList = putTransformer.getMutationCommand(fields); + if (null != mutationList) { + for (Mutation mutation : mutationList) { + if (mutation!=null) { + if(mutation instanceof Put) { + Put putObject = (Put) mutation; + if (putObject.isEmpty()) { + LOG.warn("Could not insert row with no columns " + + "for row-key column: " + Bytes.toString(putObject.getRow())); + } else { + this.table.put(putObject); + } + } else if(mutation instanceof Delete) { + Delete deleteObject = (Delete) mutation; + if (deleteObject.isEmpty()) { + LOG.warn("Could not delete row with no columns " + + "for row-key column: " + Bytes.toString(deleteObject.getRow())); + } else { + this.table.delete(deleteObject); + } + } } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/java/org/apache/sqoop/hbase/PutTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java b/src/java/org/apache/sqoop/hbase/PutTransformer.java index 533467e..c4496ee 100644 --- a/src/java/org/apache/sqoop/hbase/PutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java @@ -22,9 +22,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Mutation; /** * Interface that takes a map of jdbc field names to values @@ -71,7 +70,7 @@ public abstract class PutTransformer { * @param fields a map of field names to values to insert. * @return A list of Put commands that inserts these into HBase. */ - public abstract List<Put> getPutCommand(Map<String, Object> fields) + public abstract List<Mutation> getMutationCommand(Map<String, Object> fields) throws IOException; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java index 363e145..20bf1b9 100644 --- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java @@ -22,6 +22,8 @@ import com.cloudera.sqoop.hbase.PutTransformer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -106,7 +108,7 @@ public class ToStringPutTransformer extends PutTransformer { @Override /** {@inheritDoc} */ - public List<Put> getPutCommand(Map<String, Object> fields) + public List<Mutation> getMutationCommand(Map<String, Object> fields) throws IOException { String rowKeyCol = getRowKeyColumn(); @@ -140,7 +142,7 @@ public class ToStringPutTransformer extends PutTransformer { // from composite key String compositeRowKey = StringUtils.join(DELIMITER_HBASE, rowKeyList); // Insert record in HBase - return putRecordInHBase(fields, colFamily, compositeRowKey); + return mutationRecordInHBase(fields, colFamily, compositeRowKey); } else { // if row-key is regular primary key @@ -154,23 +156,21 @@ public class ToStringPutTransformer extends PutTransformer { } String hBaseRowKey = toHBaseString(rowKey); - return putRecordInHBase(fields, colFamily, hBaseRowKey); + return mutationRecordInHBase(fields, colFamily, hBaseRowKey); } } /** - * Performs actual Put operation for the specified record in HBase. + * Performs actual Put/delete operation for the specified record in HBase. * @param record * @param colFamily * @param rowKey - * @return List containing a single put command + * @return List containing a put/delete command */ - private List<Put> putRecordInHBase(Map<String, Object> record, + private List<Mutation> mutationRecordInHBase(Map<String, Object> record, String colFamily, String rowKey) { - // Put row-key in HBase - Put put = new Put(Bytes.toBytes(rowKey)); byte[] colFamilyBytes = Bytes.toBytes(colFamily); - + List<Mutation> mutationList = new ArrayList<Mutation>(); for (Map.Entry<String, Object> fieldEntry : record.entrySet()) { String colName = fieldEntry.getKey(); boolean rowKeyCol = false; @@ -187,17 +187,24 @@ public class ToStringPutTransformer extends PutTransformer { // check addRowKey flag before including rowKey field. Object val = fieldEntry.getValue(); if (null != val) { + // Put row-key in HBase + Put put = new Put(Bytes.toBytes(rowKey)); if ( val instanceof byte[]) { - put.add(colFamilyBytes, getFieldNameBytes(colName), + put.addColumn(colFamilyBytes, getFieldNameBytes(colName), (byte[])val); } else { - put.add(colFamilyBytes, getFieldNameBytes(colName), + put.addColumn(colFamilyBytes, getFieldNameBytes(colName), Bytes.toBytes(toHBaseString(val))); } + mutationList.add(put); + } else { + Delete delete = new Delete(Bytes.toBytes(rowKey)); + delete.addColumn(colFamilyBytes, getFieldNameBytes(colName)); + mutationList.add(delete); } } } - return Collections.singletonList(put); + return Collections.unmodifiableList(mutationList); } private String toHBaseString(Object val) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java index 58ccee7..4b583dd 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; @@ -79,9 +80,12 @@ public class HBaseBulkImportMapper } Map<String, Object> fields = val.getFieldMap(); - List<Put> putList = putTransformer.getPutCommand(fields); - for(Put put: putList){ - context.write(new ImmutableBytesWritable(put.getRow()), put); + List<Mutation> mutationList = putTransformer.getMutationCommand(fields); + for(Mutation mutation: mutationList){ + if(mutation != null && mutation instanceof Put) { + Put putObject = (Put) mutation; + context.write(new ImmutableBytesWritable(putObject.getRow()), putObject); + } } } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java index fa14a01..4d79341 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java @@ -72,6 +72,60 @@ public class HBaseImportTest extends HBaseTestCase { } @Test + public void testOverwriteNullColumnsSucceeds() throws IOException { + // Test that we can create a table and then import immediately + // back on top of it without problem and then update with null to validate + String [] argv = getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null); + String [] types = { "INT", "INT", "INT", "DATETIME" }; + String [] vals = { "0", "1", "1", "'2017-03-20'" }; + createTableWithColTypes(types, vals); + runImport(argv); + verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), "1"); + // Run a second time. + argv = getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, "DATA_COL3", "2017-03-24 01:01:01.0", null); + vals = new String[] { "0", "1", null, "'2017-03-25'" }; + updateTable(types, vals); + runImport(argv); + verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), null); + } + + @Test + public void testAppendWithTimestampSucceeds() throws IOException { + // Test that we can create a table and then import multiple rows + // validate for append scenario with time stamp + String [] argv = getArgv(true, "AppendTable", "AppendColumnFamily", true, null); + String [] types = { "INT", "INT", "INT", "DATETIME" }; + String [] vals = { "0", "1", "1", "'2017-03-20'" }; + createTableWithColTypes(types, vals); + runImport(argv); + verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1"); + // Run a second time. + argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null); + vals = new String[] { "1", "2", "3", "'2017-06-15'" }; + insertIntoTable(types, vals); + runImport(argv); + verifyHBaseCell("AppendTable", "1", "AppendColumnFamily", getColName(2), "3"); + } + + @Test + public void testAppendSucceeds() throws IOException { + // Test that we can create a table and then import multiple rows + // validate for append scenario with ID column(DATA_COL3) + String [] argv = getArgv(true, "AppendTable", "AppendColumnFamily", true, null); + String [] types = { "INT", "INT", "INT", "DATETIME" }; + String [] vals = { "0", "1", "1", "'2017-03-20'" }; + createTableWithColTypes(types, vals); + runImport(argv); + verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1"); + // Run a second time. + argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3"); + vals = new String[] { "1", "2", "3", "'2017-06-15'" }; + insertIntoTable(types, vals); + runImport(argv); + verifyHBaseCell("AppendTable", "1", "AppendColumnFamily", getColName(2), "3"); + } + + @Test public void testExitFailure() throws IOException { String [] types = { "INT", "INT", "INT" }; String [] vals = { "0", "42", "43" }; http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java index a054eb6..d9f7495 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java @@ -18,43 +18,40 @@ package com.cloudera.sqoop.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; - import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.util.StringUtils; - import org.junit.After; import org.junit.Before; import com.cloudera.sqoop.testutil.CommonArgs; import com.cloudera.sqoop.testutil.HsqldbTestServer; import com.cloudera.sqoop.testutil.ImportJobTestCase; -import java.io.File; -import java.lang.reflect.Method; -import java.util.UUID; -import org.apache.commons.io.FileUtils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; /** * Utility methods that facilitate HBase import tests. @@ -115,7 +112,38 @@ public abstract class HBaseTestCase extends ImportJobTestCase { if (hbaseCreate) { args.add("--hbase-create-table"); } + return args.toArray(new String[0]); + } + + /** + * Create the argv to pass to Sqoop as incremental options. + * @return the argv as an array of strings. + */ + protected String [] getIncrementalArgv(boolean includeHadoopFlags, + String hbaseTable, String hbaseColFam, boolean hbaseCreate, + String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn) { + + String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr); + List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray)); + if (isAppend) { + args.add("--incremental"); + args.add("append"); + if (!appendTimestamp) { + args.add("--check-column"); + args.add(checkColumn);//"ID"); + } else { + args.add("--check-column"); + args.add(lastModifiedColumn);//LAST_MODIFIED"); + } + } else { + args.add("--incremental"); + args.add("lastmodified"); + args.add("--check-column"); + args.add(checkColumn); + args.add("--last-value"); + args.add(checkValue); + } return args.toArray(new String[0]); } // Starts a mini hbase cluster in this process. http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ab7b60c/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java index 6310a39..8cbb37e 100644 --- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java @@ -383,7 +383,7 @@ public abstract class BaseSqoopTestCase { ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.executeUpdate(); } catch (SQLException sqlException) { - fail("Could not create table: " + fail("Could not insert into table: " + StringUtils.stringifyException(sqlException)); } finally { if (null != statement) { @@ -413,6 +413,139 @@ public abstract class BaseSqoopTestCase { } /** + * insert into a table with a set of columns values for a given row. + * @param colTypes the types of the columns to make + * @param vals the SQL text for each value to insert + */ + protected void insertIntoTable(String[] colTypes, String[] vals) { + assert colNames != null; + assert colNames.length == vals.length; + + Connection conn = null; + PreparedStatement statement = null; + + String[] colNames = new String[vals.length]; + for( int i = 0; i < vals.length; i++) { + colNames[i] = BASE_COL_NAME + Integer.toString(i); + } + try { + conn = getManager().getConnection(); + for (int count=0; vals != null && count < vals.length/colTypes.length; + ++count ) { + String columnListStr = ""; + String valueListStr = ""; + for (int i = 0; i < colTypes.length; i++) { + columnListStr += manager.escapeColName(colNames[i].toUpperCase()); + valueListStr += vals[count * colTypes.length + i]; + if (i < colTypes.length - 1) { + columnListStr += ", "; + valueListStr += ", "; + } + } + try { + String insertValsStr = "INSERT INTO " + manager.escapeTableName(getTableName()) + "(" + columnListStr + ")" + + " VALUES(" + valueListStr + ")"; + LOG.info("Inserting values: " + insertValsStr); + statement = conn.prepareStatement( + insertValsStr, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.executeUpdate(); + } catch (SQLException sqlException) { + fail("Could not insert into table: " + + StringUtils.stringifyException(sqlException)); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException se) { + // Ignore exception on close. + } + + statement = null; + } + } + } + conn.commit(); + this.colNames = colNames; + } catch (SQLException se) { + if (null != conn) { + try { + conn.close(); + } catch (SQLException connSE) { + // Ignore exception on close. + } + } + fail("Could not create table: " + StringUtils.stringifyException(se)); + } + + } + + /** + * update a table with a set of columns values for a given row. + * @param colTypes the types of the columns to make + * @param vals the SQL text for each value to insert + */ + protected void updateTable(String[] colTypes, String[] vals) { + assert colNames != null; + assert colNames.length == vals.length; + + Connection conn = null; + PreparedStatement statement = null; + + String[] colNames = new String[vals.length]; + for( int i = 0; i < vals.length; i++) { + colNames[i] = BASE_COL_NAME + Integer.toString(i); + } + + try { + conn = getManager().getConnection(); + for (int count=0; vals != null && count < vals.length/colNames.length; + ++count ) { + String updateStr = ""; + for (int i = 1; i < colNames.length; i++) { + updateStr += manager.escapeColName(colNames[i].toUpperCase()) + " = "+vals[count * colNames.length + i]; + if (i < colNames.length - 1) { + updateStr += ", "; + } + } + updateStr += " WHERE "+colNames[0]+"="+vals[0]+""; + try { + String updateValsStr = "UPDATE " + manager.escapeTableName(getTableName()) + " SET " + updateStr; + LOG.info("updating values: " + updateValsStr); + statement = conn.prepareStatement( + updateValsStr, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.executeUpdate(); + } catch (SQLException sqlException) { + fail("Could not update table: " + + StringUtils.stringifyException(sqlException)); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException se) { + // Ignore exception on close. + } + statement = null; + } + } + } + + conn.commit(); + this.colNames = colNames; + } catch (SQLException se) { + if (null != conn) { + try { + conn.close(); + } catch (SQLException connSE) { + // Ignore exception on close. + } + } + fail("Could not update table: " + StringUtils.stringifyException(se)); + } + } + + /** * Create a table with a set of columns and add a row of values. * @param colTypes the types of the columns to make * @param vals the SQL text for each value to insert
