Repository: sqoop Updated Branches: refs/heads/trunk 0f13c474b -> b4afcf417
SQOOP-2952: Fixing bug (row key not added into column family using --hbase-bulkload) (Szabolcs Vasas via Attila Szabo) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b4afcf41 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b4afcf41 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b4afcf41 Branch: refs/heads/trunk Commit: b4afcf4179b13c25b5e9bd182d75cab5d2e6c8d1 Parents: 0f13c47 Author: Attila Szabo <[email protected]> Authored: Thu Oct 13 14:38:21 2016 +0200 Committer: Attila Szabo <[email protected]> Committed: Thu Oct 13 14:38:21 2016 +0200 ---------------------------------------------------------------------- build.xml | 14 ++ ivy.xml | 23 ++++ .../apache/sqoop/hbase/HBasePutProcessor.java | 32 ++--- .../org/apache/sqoop/hbase/PutTransformer.java | 4 + .../sqoop/hbase/ToStringPutTransformer.java | 30 ++++- .../sqoop/mapreduce/HBaseBulkImportMapper.java | 3 +- .../sqoop/hbase/HBaseImportAddRowKeyTest.java | 128 ++++++++++++++----- .../com/cloudera/sqoop/hbase/HBaseTestCase.java | 25 ++-- 8 files changed, 175 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 97e5502..7f948b3 100644 --- a/build.xml +++ b/build.xml @@ -185,6 +185,20 @@ <property name="avrohadoopprofile" value="2" /> </then> </elseif> + + <!-- hadoopversion 260 is created for testing purposes only. Do not use it in production! --> + <elseif> + <equals arg1="${hadoopversion}" arg2="260" /> + <then> + <property name="hadoop.version" value="2.6.0" /> + <property name="hbase95.version" value="1.2.0" /> + <property name="zookeeper.version" value="3.4.5" /> + <property name="hadoop.version.full" value="2.6.0" /> + <property name="hcatalog.version" value="0.13.0" /> + <property name="hbasecompatprofile" value="2" /> + <property name="avrohadoopprofile" value="2" /> + </then> + </elseif> <else> <fail message="Unrecognized hadoopversion. Can only be 20, 23, 100, 200 or 210." /> </else> http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/ivy.xml ---------------------------------------------------------------------- diff --git a/ivy.xml b/ivy.xml index a502530..ee1dafa 100644 --- a/ivy.xml +++ b/ivy.xml @@ -55,6 +55,8 @@ under the License. extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" /> <conf name="hadoop210" visibility="private" extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" /> + <conf name="hadoop260" visibility="private" + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" /> <conf name="test" visibility="private" extends="common,runtime"/> <conf name="hadoop23test" visibility="private" extends="test,hadoop23" /> @@ -62,6 +64,7 @@ under the License. <conf name="hadoop100test" visibility="private" extends="test,hadoop100" /> <conf name="hadoop200test" visibility="private" extends="test,hadoop200" /> <conf name="hadoop210test" visibility="private" extends="test,hadoop210" /> + <conf name="hadoop260test" visibility="private" extends="test,hadoop260" /> <!-- We don't redistribute everything we depend on (e.g., Hadoop itself); anything which Hadoop itself also depends on, we do not ship. @@ -105,6 +108,26 @@ under the License. <dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}" conf="hadoop210->default"/> + <!-- Dependencies for Hadoop 2.6.0 --> + <dependency org="org.apache.hadoop" name="hadoop-common" + rev="${hadoop.version}" conf="hadoop260->default"> + <artifact name="hadoop-common" type="jar" /> + <artifact name="hadoop-common" type="jar" m:classifier="tests"/> + </dependency> + <dependency org="org.apache.hadoop" name="hadoop-hdfs" + rev="${hadoop.version}" conf="hadoop260->default"> + <artifact name="hadoop-hdfs" type="jar" /> + <artifact name="hadoop-hdfs" type="jar" m:classifier="tests"/> + </dependency> + <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-common" + rev="${hadoop.version}" conf="hadoop260->default"/> + <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" + rev="${hadoop.version}" conf="hadoop260->default"/> + <dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}" + conf="hadoop260->default"/> + <dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}" + conf="hadoop260->default"/> + <!-- Dependencies for Hadoop 2.0.0 --> <dependency org="org.apache.hadoop" name="hadoop-common" rev="${hadoop.version}" conf="hadoop200->default"> http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index b2431ac..fdbe127 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -18,11 +18,9 @@ package org.apache.sqoop.hbase; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - +import com.cloudera.sqoop.lib.FieldMapProcessor; +import com.cloudera.sqoop.lib.FieldMappable; +import com.cloudera.sqoop.lib.ProcessingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; @@ -31,11 +29,11 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.sqoop.mapreduce.ImportJobBase; -import com.cloudera.sqoop.lib.FieldMappable; -import com.cloudera.sqoop.lib.FieldMapProcessor; -import com.cloudera.sqoop.lib.ProcessingException; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; /** * SqoopRecordProcessor that performs an HBase "put" operation @@ -105,21 +103,7 @@ public class HBasePutProcessor implements Closeable, Configurable, if (null == putTransformer) { throw new RuntimeException("Could not instantiate PutTransformer."); } - - this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); - this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); - - if (this.putTransformer instanceof ToStringPutTransformer) { - ToStringPutTransformer stringPutTransformer = - (ToStringPutTransformer) this.putTransformer; - stringPutTransformer.bigDecimalFormatString = - conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, - ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); - stringPutTransformer.addRowKey = - conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY, - HBasePutProcessor.ADD_ROW_KEY_DEFAULT); - stringPutTransformer.detectCompositeKey(); - } + putTransformer.init(conf); this.tableName = conf.get(TABLE_NAME_KEY, null); try { http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/PutTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java b/src/java/org/apache/sqoop/hbase/PutTransformer.java index 8d6bcac..533467e 100644 --- a/src/java/org/apache/sqoop/hbase/PutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; /** @@ -33,6 +35,8 @@ public abstract class PutTransformer { private String columnFamily; private String rowKeyColumn; + public abstract void init(Configuration conf); + /** * @return the default column family to insert into. */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java index b5cad1d..363e145 100644 --- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java @@ -18,6 +18,15 @@ package org.apache.sqoop.hbase; +import com.cloudera.sqoop.hbase.PutTransformer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.mapreduce.ImportJobBase; + import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; @@ -27,13 +36,10 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; - -import com.cloudera.sqoop.hbase.PutTransformer; +import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY; +import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT; +import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY; +import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY; /** * PutTransformer that calls toString on all non-null fields. @@ -204,4 +210,14 @@ public class ToStringPutTransformer extends PutTransformer { return valString; } + @Override + public void init(Configuration conf) { + setColumnFamily(conf.get(COL_FAMILY_KEY, null)); + setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + + this.bigDecimalFormatString = conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, + ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); + this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT); + detectCompositeKey(); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java index 363b5d7..58ccee7 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java @@ -66,8 +66,7 @@ public class HBaseBulkImportMapper if (null == putTransformer) { throw new RuntimeException("Could not instantiate PutTransformer."); } - this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); - this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + putTransformer.init(conf); } @Override public void map(LongWritable key, SqoopRecord val, Context context) http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java index cfbb1d3..abf9f1c 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java @@ -18,68 +18,126 @@ package com.cloudera.sqoop.hbase; +import junit.framework.JUnit4TestAdapter; +import org.apache.commons.lang.StringUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; -import org.junit.Test; +import static java.util.Arrays.asList; +import static org.apache.commons.lang.StringUtils.join; -/** - * - */ +@RunWith(Parameterized.class) public class HBaseImportAddRowKeyTest extends HBaseTestCase { + @Parameterized.Parameters(name = "bulkLoad = {0}") + public static Iterable<? extends Object> bulkLoadParameters() { + return Arrays.asList(new Boolean[] { false } , new Boolean[] { true } ); + } + + private String[] columnTypes; + + private String[] columnValues; + + private String hbaseTableName; + + private String hbaseColumnFamily; + + private String hbaseTmpDir; + + private String hbaseBulkLoadDir; + + private boolean bulkLoad; + + public HBaseImportAddRowKeyTest(boolean bulkLoad) { + this.bulkLoad = bulkLoad; + } + + @Before + public void setUp() { + super.setUp(); + columnTypes = new String[] { "INT", "INT" }; + columnValues = new String[] { "0", "1" }; + hbaseTableName = "addRowKeyTable"; + hbaseColumnFamily = "addRowKeyFamily"; + hbaseTmpDir = TEMP_BASE_DIR + "hbaseTmpDir"; + hbaseBulkLoadDir = TEMP_BASE_DIR + "hbaseBulkLoadDir"; + createTableWithColTypes(columnTypes, columnValues); + } + @Test public void testAddRowKey() throws IOException { - String[] types = { "INT", "INT" }; - String[] vals = { "0", "1" }; - createTableWithColTypes(types, vals); - - String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null); - String[] argv = new String[otherArg.length + 2]; - argv[0] = "-D"; - argv[1] = "sqoop.hbase.add.row.key=true"; - System.arraycopy(otherArg, 0, argv, 2, otherArg.length); + String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily); runImport(argv); // Row key should have been added - verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(0), "0"); - verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(1), "1"); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), columnValues[0]); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]); } @Test public void testAddRowKeyDefault() throws IOException { - String[] types = { "INT", "INT" }; - String[] vals = { "0", "1" }; - createTableWithColTypes(types, vals); - - String[] argv = getArgv(true, "addRowKeyDfT", "addRowKeyDfF", true, null); + String[] argv = getImportArguments(false, hbaseTableName, hbaseColumnFamily); runImport(argv); // Row key should not be added by default - verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null); - verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1"); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), null); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]); } @Test public void testAddCompositeKey() throws IOException { - String[] types = { "INT", "INT" }; - String[] vals = { "0", "1" }; - createTableWithColTypes(types, vals); - - String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null); - String[] argv = new String[otherArg.length + 4]; - argv[0]="-D"; - argv[1]="sqoop.hbase.add.row.key=true"; - System.arraycopy(otherArg, 0, argv, 2, otherArg.length); - argv[argv.length - 2] = "--hbase-row-key"; - argv[argv.length - 1] = getColName(0)+","+getColName(1); + String rowKey = getColName(0)+","+getColName(1); + + String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily, rowKey); runImport(argv); // Row key should have been added - verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0"); - verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1"); + verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(0), columnValues[0]); + verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(1), columnValues[1]); + } + + private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily) { + return getImportArguments(addRowKey, hbaseTableName, hbaseColumnFamily, null); + } + + private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily, String rowKey) { + List<String> result = new ArrayList<>(); + + if (addRowKey) { + result.add("-D"); + result.add("sqoop.hbase.add.row.key=true"); + } + result.add("-D"); + result.add("hbase.fs.tmp.dir=" + hbaseTmpDir); + + result.addAll(asList(getArgv(true, hbaseTableName, hbaseColumnFamily, true, null))); + + if(bulkLoad) { + result.add("--target-dir"); + result.add(hbaseBulkLoadDir); + result.add("--hbase-bulkload"); + } + + if (!StringUtils.isBlank(rowKey)) { + result.add("--hbase-row-key"); + result.add(rowKey); + } + + return result.toArray(new String[result.size()]); + } + + public static junit.framework.Test suite() { + return new JUnit4TestAdapter(HBaseImportAddRowKeyTest.class); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java index 37dc004..ad92a07 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java @@ -88,7 +88,7 @@ public abstract class HBaseTestCase extends ImportJobTestCase { if (includeHadoopFlags) { CommonArgs.addHadoopFlags(args); args.add("-D"); - args.add("hbase.zookeeper.property.clientPort=21818"); + args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort); } if (null != queryStr) { @@ -120,40 +120,33 @@ public abstract class HBaseTestCase extends ImportJobTestCase { private String workDir = createTempDir().getAbsolutePath(); private MiniZooKeeperCluster zookeeperCluster; private MiniHBaseCluster hbaseCluster; + private int zookeeperPort; @Override @Before public void setUp() { try { + String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + zookeeperCluster = new MiniZooKeeperCluster(); + zookeeperCluster.startup(new File(zookeeperDir)); + zookeeperPort = zookeeperCluster.getClientPort(); + HBaseTestCase.recordTestBuildDataProperty(); String hbaseDir = new File(workDir, "hbase").getAbsolutePath(); String hbaseRoot = "file://" + hbaseDir; Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT - hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818); + hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort); hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); hbaseConf.setInt("hbase.master.info.port", -1); hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); - String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); - int zookeeperPort = 21818; - zookeeperCluster = new MiniZooKeeperCluster(); - Method m; - Class<?> zkParam[] = {Integer.TYPE}; - try { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", - zkParam); - } catch (NoSuchMethodException e) { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", - zkParam); - } - m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); - zookeeperCluster.startup(new File(zookeeperDir)); hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); HMaster master = hbaseCluster.getMaster(); Object serverName = master.getServerName(); String hostAndPort; + Method m; if (serverName instanceof String) { System.out.println("Server name is string, using HServerAddress."); m = HMaster.class.getDeclaredMethod("getMasterAddress",
