Repository: sqoop
Updated Branches:
  refs/heads/trunk 0f13c474b -> b4afcf417


SQOOP-2952: Fixing bug (row key not added into column family using 
--hbase-bulkload)

(Szabolcs Vasas via Attila Szabo)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b4afcf41
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b4afcf41
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b4afcf41

Branch: refs/heads/trunk
Commit: b4afcf4179b13c25b5e9bd182d75cab5d2e6c8d1
Parents: 0f13c47
Author: Attila Szabo <mau...@apache.org>
Authored: Thu Oct 13 14:38:21 2016 +0200
Committer: Attila Szabo <mau...@apache.org>
Committed: Thu Oct 13 14:38:21 2016 +0200

----------------------------------------------------------------------
 build.xml                                       |  14 ++
 ivy.xml                                         |  23 ++++
 .../apache/sqoop/hbase/HBasePutProcessor.java   |  32 ++---
 .../org/apache/sqoop/hbase/PutTransformer.java  |   4 +
 .../sqoop/hbase/ToStringPutTransformer.java     |  30 ++++-
 .../sqoop/mapreduce/HBaseBulkImportMapper.java  |   3 +-
 .../sqoop/hbase/HBaseImportAddRowKeyTest.java   | 128 ++++++++++++++-----
 .../com/cloudera/sqoop/hbase/HBaseTestCase.java |  25 ++--
 8 files changed, 175 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 97e5502..7f948b3 100644
--- a/build.xml
+++ b/build.xml
@@ -185,6 +185,20 @@
         <property name="avrohadoopprofile" value="2" />
       </then>
     </elseif>
+
+    <!-- hadoopversion 260 is created for testing purposes only. Do not use it 
in production! -->
+    <elseif>
+      <equals arg1="${hadoopversion}" arg2="260" />
+      <then>
+        <property name="hadoop.version" value="2.6.0" />
+        <property name="hbase95.version" value="1.2.0" />
+        <property name="zookeeper.version" value="3.4.5" />
+        <property name="hadoop.version.full" value="2.6.0" />
+        <property name="hcatalog.version" value="0.13.0" />
+        <property name="hbasecompatprofile" value="2" />
+        <property name="avrohadoopprofile" value="2" />
+      </then>
+    </elseif>
     <else>
       <fail message="Unrecognized hadoopversion. Can only be 20, 23, 100, 200 
or 210." />
     </else>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index a502530..ee1dafa 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -55,6 +55,8 @@ under the License.
       
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo"
 />
     <conf name="hadoop210" visibility="private"
       
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo"
 />
+    <conf name="hadoop260" visibility="private"
+      
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo"
 />
 
     <conf name="test" visibility="private" extends="common,runtime"/>
     <conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@@ -62,6 +64,7 @@ under the License.
     <conf name="hadoop100test" visibility="private" extends="test,hadoop100" />
     <conf name="hadoop200test" visibility="private" extends="test,hadoop200" />
     <conf name="hadoop210test" visibility="private" extends="test,hadoop210" />
+    <conf name="hadoop260test" visibility="private" extends="test,hadoop260" />
 
     <!-- We don't redistribute everything we depend on (e.g., Hadoop itself);
          anything which Hadoop itself also depends on, we do not ship.
@@ -105,6 +108,26 @@ under the License.
     <dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
       conf="hadoop210->default"/>
 
+    <!-- Dependencies for Hadoop 2.6.0 -->
+    <dependency org="org.apache.hadoop" name="hadoop-common"
+      rev="${hadoop.version}" conf="hadoop260->default">
+      <artifact name="hadoop-common" type="jar" />
+      <artifact name="hadoop-common" type="jar" m:classifier="tests"/>
+    </dependency>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs"
+      rev="${hadoop.version}" conf="hadoop260->default">
+      <artifact name="hadoop-hdfs" type="jar" />
+      <artifact name="hadoop-hdfs" type="jar" m:classifier="tests"/>
+    </dependency>
+    <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-common"
+      rev="${hadoop.version}" conf="hadoop260->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
+      rev="${hadoop.version}" conf="hadoop260->default"/>
+    <dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
+      conf="hadoop260->default"/>
+    <dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
+      conf="hadoop260->default"/>
+
     <!-- Dependencies for Hadoop 2.0.0 -->
     <dependency org="org.apache.hadoop" name="hadoop-common"
       rev="${hadoop.version}" conf="hadoop200->default">

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java 
b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index b2431ac..fdbe127 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -18,11 +18,9 @@
 
 package org.apache.sqoop.hbase;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.ProcessingException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -31,11 +29,11 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.sqoop.mapreduce.ImportJobBase;
 
-import com.cloudera.sqoop.lib.FieldMappable;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.ProcessingException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
 /**
  * SqoopRecordProcessor that performs an HBase "put" operation
@@ -105,21 +103,7 @@ public class HBasePutProcessor implements Closeable, 
Configurable,
     if (null == putTransformer) {
       throw new RuntimeException("Could not instantiate PutTransformer.");
     }
-
-    this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
-    this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
-
-    if (this.putTransformer instanceof ToStringPutTransformer) {
-      ToStringPutTransformer stringPutTransformer =
-          (ToStringPutTransformer) this.putTransformer;
-      stringPutTransformer.bigDecimalFormatString =
-          conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
-              ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
-      stringPutTransformer.addRowKey =
-          conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY,
-              HBasePutProcessor.ADD_ROW_KEY_DEFAULT);
-      stringPutTransformer.detectCompositeKey();
-    }
+    putTransformer.init(conf);
 
     this.tableName = conf.get(TABLE_NAME_KEY, null);
     try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/PutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java 
b/src/java/org/apache/sqoop/hbase/PutTransformer.java
index 8d6bcac..533467e 100644
--- a/src/java/org/apache/sqoop/hbase/PutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
 
 /**
@@ -33,6 +35,8 @@ public abstract class PutTransformer {
   private String columnFamily;
   private String rowKeyColumn;
 
+  public abstract void init(Configuration conf);
+
   /**
    * @return the default column family to insert into.
    */

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java 
b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index b5cad1d..363e145 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -18,6 +18,15 @@
 
 package org.apache.sqoop.hbase;
 
+import com.cloudera.sqoop.hbase.PutTransformer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -27,13 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.sqoop.hbase.PutTransformer;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT;
+import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
 
 /**
  * PutTransformer that calls toString on all non-null fields.
@@ -204,4 +210,14 @@ public class ToStringPutTransformer extends PutTransformer 
{
     return valString;
   }
 
+  @Override
+  public void init(Configuration conf) {
+    setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+    setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+
+    this.bigDecimalFormatString = 
conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+              ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT);
+    detectCompositeKey();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
index 363b5d7..58ccee7 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
@@ -66,8 +66,7 @@ public class HBaseBulkImportMapper
     if (null == putTransformer) {
       throw new RuntimeException("Could not instantiate PutTransformer.");
     }
-    this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
-    this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+    putTransformer.init(conf);
   }
   @Override
   public void map(LongWritable key, SqoopRecord val, Context context)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java 
b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
index cfbb1d3..abf9f1c 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
@@ -18,68 +18,126 @@
 
 package com.cloudera.sqoop.hbase;
 
+import junit.framework.JUnit4TestAdapter;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
-import org.junit.Test;
+import static java.util.Arrays.asList;
+import static org.apache.commons.lang.StringUtils.join;
 
-/**
- *
- */
+@RunWith(Parameterized.class)
 public class HBaseImportAddRowKeyTest extends HBaseTestCase {
 
+  @Parameterized.Parameters(name = "bulkLoad = {0}")
+  public static Iterable<? extends Object> bulkLoadParameters() {
+    return Arrays.asList(new Boolean[] { false } , new Boolean[] { true } );
+  }
+
+  private String[] columnTypes;
+
+  private String[] columnValues;
+
+  private String hbaseTableName;
+
+  private String hbaseColumnFamily;
+
+  private String hbaseTmpDir;
+
+  private String hbaseBulkLoadDir;
+
+  private boolean bulkLoad;
+
+  public HBaseImportAddRowKeyTest(boolean bulkLoad) {
+    this.bulkLoad = bulkLoad;
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    columnTypes = new String[] { "INT", "INT" };
+    columnValues = new String[] { "0", "1" };
+    hbaseTableName = "addRowKeyTable";
+    hbaseColumnFamily = "addRowKeyFamily";
+    hbaseTmpDir = TEMP_BASE_DIR + "hbaseTmpDir";
+    hbaseBulkLoadDir = TEMP_BASE_DIR + "hbaseBulkLoadDir";
+    createTableWithColTypes(columnTypes, columnValues);
+  }
+
   @Test
   public void testAddRowKey() throws IOException {
-    String[] types = { "INT", "INT" };
-    String[] vals = { "0", "1" };
-    createTableWithColTypes(types, vals);
-
-    String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
-    String[] argv = new String[otherArg.length + 2];
-    argv[0] = "-D";
-    argv[1] = "sqoop.hbase.add.row.key=true";
-    System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
+    String[] argv = getImportArguments(true, hbaseTableName, 
hbaseColumnFamily);
 
     runImport(argv);
 
     // Row key should have been added
-    verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(0), "0");
-    verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(1), "1");
+    verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, 
getColName(0), columnValues[0]);
+    verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, 
getColName(1), columnValues[1]);
   }
 
   @Test
   public void testAddRowKeyDefault() throws IOException {
-    String[] types = { "INT", "INT" };
-    String[] vals = { "0", "1" };
-    createTableWithColTypes(types, vals);
-
-    String[] argv = getArgv(true, "addRowKeyDfT", "addRowKeyDfF", true, null);
+    String[] argv = getImportArguments(false, hbaseTableName, 
hbaseColumnFamily);
 
     runImport(argv);
 
     // Row key should not be added by default
-    verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null);
-    verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1");
+    verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, 
getColName(0), null);
+    verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, 
getColName(1), columnValues[1]);
   }
 
   @Test
   public void testAddCompositeKey() throws IOException {
-    String[] types = { "INT", "INT" };
-    String[] vals = { "0", "1" };
-    createTableWithColTypes(types, vals);
-
-    String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
-    String[] argv = new String[otherArg.length + 4];
-    argv[0]="-D";
-    argv[1]="sqoop.hbase.add.row.key=true";
-    System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
-    argv[argv.length - 2] = "--hbase-row-key";
-    argv[argv.length - 1] = getColName(0)+","+getColName(1);
+    String rowKey = getColName(0)+","+getColName(1);
+
+    String[] argv = getImportArguments(true, hbaseTableName, 
hbaseColumnFamily, rowKey);
 
     runImport(argv);
 
     // Row key should have been added
-    verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0");
-    verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1");
+    verifyHBaseCell(hbaseTableName, join(columnValues, '_'), 
hbaseColumnFamily, getColName(0), columnValues[0]);
+    verifyHBaseCell(hbaseTableName, join(columnValues, '_'), 
hbaseColumnFamily, getColName(1), columnValues[1]);
+  }
+
+  private String[] getImportArguments(boolean addRowKey, String 
hbaseTableName, String hbaseColumnFamily) {
+    return getImportArguments(addRowKey, hbaseTableName, hbaseColumnFamily, 
null);
+  }
+
+  private String[] getImportArguments(boolean addRowKey, String 
hbaseTableName, String hbaseColumnFamily, String rowKey) {
+    List<String> result = new ArrayList<>();
+
+    if (addRowKey) {
+      result.add("-D");
+      result.add("sqoop.hbase.add.row.key=true");
+    }
+    result.add("-D");
+    result.add("hbase.fs.tmp.dir=" + hbaseTmpDir);
+
+    result.addAll(asList(getArgv(true, hbaseTableName, hbaseColumnFamily, 
true, null)));
+
+    if(bulkLoad) {
+      result.add("--target-dir");
+      result.add(hbaseBulkLoadDir);
+      result.add("--hbase-bulkload");
+    }
+
+    if (!StringUtils.isBlank(rowKey)) {
+      result.add("--hbase-row-key");
+      result.add(rowKey);
+    }
+
+    return result.toArray(new String[result.size()]);
+  }
+
+  public static junit.framework.Test suite() {
+    return new JUnit4TestAdapter(HBaseImportAddRowKeyTest.class);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java 
b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
index 37dc004..ad92a07 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
@@ -88,7 +88,7 @@ public abstract class HBaseTestCase extends ImportJobTestCase 
{
     if (includeHadoopFlags) {
       CommonArgs.addHadoopFlags(args);
       args.add("-D");
-      args.add("hbase.zookeeper.property.clientPort=21818");
+      args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort);
     }
 
     if (null != queryStr) {
@@ -120,40 +120,33 @@ public abstract class HBaseTestCase extends 
ImportJobTestCase {
   private String workDir = createTempDir().getAbsolutePath();
   private MiniZooKeeperCluster zookeeperCluster;
   private MiniHBaseCluster hbaseCluster;
+  private int zookeeperPort;
 
   @Override
   @Before
   public void setUp() {
     try {
+      String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
+      zookeeperCluster = new MiniZooKeeperCluster();
+      zookeeperCluster.startup(new File(zookeeperDir));
+      zookeeperPort = zookeeperCluster.getClientPort();
+
       HBaseTestCase.recordTestBuildDataProperty();
       String hbaseDir = new File(workDir, "hbase").getAbsolutePath();
       String hbaseRoot = "file://" + hbaseDir;
       Configuration hbaseConf = HBaseConfiguration.create();
       hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot);
       //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT
-      hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818);
+      hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
       hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0");
       hbaseConf.setInt("hbase.master.info.port", -1);
       hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500);
-      String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
-      int zookeeperPort = 21818;
-      zookeeperCluster = new MiniZooKeeperCluster();
-      Method m;
-      Class<?> zkParam[] = {Integer.TYPE};
-      try {
-        m = 
MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort",
-                zkParam);
-      } catch (NoSuchMethodException e) {
-        m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort",
-                zkParam);
-      }
-      m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)});
-      zookeeperCluster.startup(new File(zookeeperDir));
       hbaseCluster = new MiniHBaseCluster(hbaseConf, 1);
       HMaster master = hbaseCluster.getMaster();
       Object serverName = master.getServerName();
 
       String hostAndPort;
+      Method m;
       if (serverName instanceof String) {
         System.out.println("Server name is string, using HServerAddress.");
         m = HMaster.class.getDeclaredMethod("getMasterAddress",

Reply via email to