Author: brock
Date: Tue Sep 30 17:58:20 2014
New Revision: 1628502

URL: http://svn.apache.org/r1628502
Log:
HIVE-6148 - Support arbitrary structs stored in HBase (Swarnim Kulkarni via 
Brock)

Added:
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
    
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
Modified:
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
    
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
 Tue Sep 30 17:58:20 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.hbase;
 
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
@@ -26,9 +29,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
-import java.io.IOException;
-import java.util.Properties;
-
 public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements 
HBaseKeyFactory {
 
   protected LazySimpleSerDe.SerDeParameters serdeParams;

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java 
(original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java 
Tue Sep 30 17:58:20 2014
@@ -53,6 +53,7 @@ public class HBaseSerDe extends Abstract
   public static final String HBASE_COMPOSITE_KEY_CLASS = 
"hbase.composite.key.class";
   public static final String HBASE_COMPOSITE_KEY_TYPES = 
"hbase.composite.key.types";
   public static final String HBASE_COMPOSITE_KEY_FACTORY = 
"hbase.composite.key.factory";
+  public static final String HBASE_STRUCT_SERIALIZER_CLASS = 
"hbase.struct.serialization.class";
   public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
   public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
   public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
@@ -98,7 +99,7 @@ public class HBaseSerDe extends Abstract
 
     cachedHBaseRow = new LazyHBaseRow(
         (LazySimpleStructObjectInspector) cachedObjectInspector,
-        serdeParams.getKeyIndex(), serdeParams.getKeyFactory());
+            serdeParams.getKeyIndex(), serdeParams.getKeyFactory(), 
serdeParams.getValueFactories());
 
     serializer = new HBaseRowSerializer(serdeParams);
 

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
 Tue Sep 30 17:58:20 2014
@@ -41,6 +41,10 @@ import org.apache.hadoop.hive.serde.serd
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.util.StringUtils;
 
@@ -371,6 +375,19 @@ public class HBaseSerDeHelper {
   }
 
   /**
+   * Create the {@link LazyObjectBase lazy field}
+   * */
+  public static LazyObjectBase createLazyField(ColumnMapping[] columnMappings, 
int fieldID,
+      ObjectInspector inspector) {
+    ColumnMapping colMap = columnMappings[fieldID];
+    if (colMap.getQualifierName() == null && !colMap.isHbaseRowKey()) {
+      // a column family
+      return new LazyHBaseCellMap((LazyMapObjectInspector) inspector);
+    }
+    return LazyFactory.createLazyObject(inspector, 
colMap.getBinaryStorage().get(0));
+  }
+
+  /**
    * Auto-generates the key struct for composite keys
    * 
    * @param compositeKeyParts map of composite key part name to its type. 
Usually this would be

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
 Tue Sep 30 17:58:20 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.hbase.Colu
 import org.apache.hadoop.hive.hbase.struct.AvroHBaseValueFactory;
 import org.apache.hadoop.hive.hbase.struct.DefaultHBaseValueFactory;
 import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
+import org.apache.hadoop.hive.hbase.struct.StructHBaseValueFactory;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -204,11 +205,21 @@ public class HBaseSerDeParameters {
       for (int i = 0; i < columnMappings.size(); i++) {
         String serType = getSerializationType(conf, tbl, 
columnMappings.getColumnsMapping()[i]);
 
-        if (serType != null && serType.equals(AVRO_SERIALIZATION_TYPE)) {
+        if (AVRO_SERIALIZATION_TYPE.equals(serType)) {
           Schema schema = getSchema(conf, tbl, 
columnMappings.getColumnsMapping()[i]);
-          valueFactories.add(new AvroHBaseValueFactory(schema));
+          valueFactories.add(new AvroHBaseValueFactory(i, schema));
+        } else if (STRUCT_SERIALIZATION_TYPE.equals(serType)) {
+          String structValueClassName = 
tbl.getProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS);
+
+          if (structValueClassName == null) {
+            throw new 
IllegalArgumentException(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS
+                + " must be set for hbase columns of type [" + 
STRUCT_SERIALIZATION_TYPE + "]");
+          }
+
+          Class<?> structValueClass = job.getClassByName(structValueClassName);
+          valueFactories.add(new StructHBaseValueFactory(i, structValueClass));
         } else {
-          valueFactories.add(new DefaultHBaseValueFactory());
+          valueFactories.add(new DefaultHBaseValueFactory(i));
         }
       }
     } catch (Exception e) {

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
 Tue Sep 30 17:58:20 2014
@@ -20,15 +20,15 @@ package org.apache.hadoop.hive.hbase;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
-import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
 import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
-import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
 import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 
@@ -47,18 +47,21 @@ public class LazyHBaseRow extends LazySt
 
   private final int iKey;
   private final HBaseKeyFactory keyFactory;
+  private final List<HBaseValueFactory> valueFactories;
 
   public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
-    this(oi, -1, null);
+    this(oi, -1, null, null);
   }
 
   /**
    * Construct a LazyHBaseRow object with the ObjectInspector.
    */
-  public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, 
HBaseKeyFactory keyFactory) {
+  public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, 
HBaseKeyFactory keyFactory,
+      List<HBaseValueFactory> valueFactories) {
     super(oi);
     this.iKey = iKey;
     this.keyFactory = keyFactory;
+    this.valueFactories = valueFactories;
   }
 
   /**
@@ -76,13 +79,14 @@ public class LazyHBaseRow extends LazySt
     if (fieldID == iKey) {
       return keyFactory.createKey(fieldRef.getFieldObjectInspector());
     }
-    ColumnMapping colMap = columnsMapping[fieldID];
-    if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
-      // a column family
-      return new LazyHBaseCellMap((LazyMapObjectInspector) 
fieldRef.getFieldObjectInspector());
+
+    if (valueFactories != null) {
+      return 
valueFactories.get(fieldID).createValueObject(fieldRef.getFieldObjectInspector());
     }
-    return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(),
-        colMap.binaryStorage.get(0));
+
+    // fallback to default
+    return HBaseSerDeHelper.createLazyField(columnsMapping, fieldID,
+        fieldRef.getFieldObjectInspector());
   }
 
   /**

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
 Tue Sep 30 17:58:20 2014
@@ -48,7 +48,8 @@ public class AvroHBaseValueFactory exten
    * 
    * @param schema the associated {@link Schema schema}
    * */
-  public AvroHBaseValueFactory(Schema schema) {
+  public AvroHBaseValueFactory(int fieldID, Schema schema) {
+    super(fieldID);
     this.schema = schema;
   }
 

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
 Tue Sep 30 17:58:20 2014
@@ -21,9 +21,12 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.ColumnMappings;
+import org.apache.hadoop.hive.hbase.HBaseSerDeHelper;
 import org.apache.hadoop.hive.hbase.HBaseSerDeParameters;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -35,15 +38,23 @@ import org.apache.hadoop.hive.serde2.typ
 public class DefaultHBaseValueFactory implements HBaseValueFactory{
 
   protected LazySimpleSerDe.SerDeParameters serdeParams;
+  protected ColumnMappings columnMappings;
   protected HBaseSerDeParameters hbaseParams;
   protected Properties properties;
   protected Configuration conf;
 
+  private int fieldID;
+
+  public DefaultHBaseValueFactory(int fieldID) {
+    this.fieldID = fieldID;
+  }
+
        @Override
   public void init(HBaseSerDeParameters hbaseParams, Configuration conf, 
Properties properties)
                        throws SerDeException {
     this.hbaseParams = hbaseParams;
     this.serdeParams = hbaseParams.getSerdeParams();
+    this.columnMappings = hbaseParams.getColumnMappings();
     this.properties = properties;
     this.conf = conf;
        }
@@ -55,6 +66,11 @@ public class DefaultHBaseValueFactory im
         1, serdeParams.getNullSequence(), serdeParams.isEscaped(), 
serdeParams.getEscapeChar());
        }
 
+  @Override
+  public LazyObjectBase createValueObject(ObjectInspector inspector) throws 
SerDeException {
+    return 
HBaseSerDeHelper.createLazyField(columnMappings.getColumnsMapping(), fieldID, 
inspector);
+  }
+
        @Override
        public byte[] serializeValue(Object object, StructField field)
                        throws IOException {

Added: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java?rev=1628502&view=auto
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
 (added)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
 Tue Sep 30 17:58:20 2014
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.hbase.struct;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * This is an extension of LazyStruct. All value structs should extend this 
class and override the
+ * {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID 
of a value in the
+ * value structure.
+ * <p>
+ * For example, for a value structure <i>"/part1/part2/part3"</i>, 
<i>part1</i> will have an id
+ * <i>0</i>, <i>part2</i> will have an id <i>1</i> and <i>part3</i> will have 
an id <i>2</i>. Custom
+ * implementations of getField(fieldID) should return the value corresponding 
to that fieldID. So,
+ * for the above example, the value returned for <i>getField(0)</i> should be 
</i>part1</i>,
+ * <i>getField(1)</i> should be <i>part2</i> and <i>getField(2)</i> should be 
<i>part3</i>.
+ * </p>
+ * <p>
+ * All implementation are expected to have a constructor of the form <br>
+ *
+ * <pre>
+ * MyCustomStructObject(LazySimpleStructObjectInspector oi, Properties props, 
Configuration conf, ColumnMapping colMap)
+ * </pre>
+ * 
+ * </p>
+ * */
+public class HBaseStructValue extends LazyStruct {
+
+  /**
+   * The column family name
+   */
+  protected String familyName;
+
+  /**
+   * The column qualifier name
+   */
+  protected String qualifierName;
+
+  public HBaseStructValue(LazySimpleStructObjectInspector oi) {
+    super(oi);
+  }
+
+  /**
+   * Set the row data for this LazyStruct.
+   * 
+   * @see LazyObject#init(ByteArrayRef, int, int)
+   * 
+   * @param familyName The column family name
+   * @param qualifierName The column qualifier name
+   */
+  public void init(ByteArrayRef bytes, int start, int length, String 
familyName,
+      String qualifierName) {
+    init(bytes, start, length);
+    this.familyName = familyName;
+    this.qualifierName = qualifierName;
+  }
+
+  @Override
+  public ArrayList<Object> getFieldsAsList() {
+    ArrayList<Object> allFields = new ArrayList<Object>();
+
+    List<? extends StructField> fields = oi.getAllStructFieldRefs();
+
+    for (int i = 0; i < fields.size(); i++) {
+      allFields.add(getField(i));
+    }
+
+    return allFields;
+  }
+
+  /**
+   * Create an initialize a {@link LazyObject} with the given bytes for the 
given fieldID.
+   * 
+   * @param fieldID field for which the object is to be created
+   * @param bytes value with which the object is to be initialized with
+   * @return initialized {@link LazyObject}
+   * */
+  public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID, 
byte[] bytes) {
+    ObjectInspector fieldOI = 
oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+    LazyObject<? extends ObjectInspector> lazyObject = 
LazyFactory.createLazyObject(fieldOI);
+
+    ByteArrayRef ref = new ByteArrayRef();
+
+    ref.setData(bytes);
+
+    // initialize the lazy object
+    lazyObject.init(ref, 0, ref.getData().length);
+
+    return lazyObject;
+  }
+}
\ No newline at end of file

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
 Tue Sep 30 17:58:20 2014
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.HBaseKeyFactory;
 import org.apache.hadoop.hive.hbase.HBaseSerDeParameters;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -53,6 +55,13 @@ public interface HBaseValueFactory {
   ObjectInspector createValueObjectInspector(TypeInfo type) throws 
SerDeException;
 
   /**
+   * create custom object for hbase value
+   *
+   * @param inspector OI create by {@link 
HBaseKeyFactory#createKeyObjectInspector}
+   */
+  LazyObjectBase createValueObject(ObjectInspector inspector) throws 
SerDeException;
+
+  /**
    * Serialize the given hive object
    * 
    * @param object the object to be serialized

Added: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java?rev=1628502&view=auto
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
 (added)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
 Tue Sep 30 17:58:20 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.hbase.struct;
+
+import java.lang.reflect.Constructor;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * Implementation of {@link HBaseValueFactory} to consume a custom struct
+ * */
+public class StructHBaseValueFactory<T extends HBaseStructValue> extends 
DefaultHBaseValueFactory {
+
+  private final int fieldID;
+  private final Constructor constructor;
+
+  public StructHBaseValueFactory(int fieldID, Class<?> structValueClass) 
throws Exception {
+    super(fieldID);
+    this.fieldID = fieldID;
+    this.constructor =
+        
structValueClass.getDeclaredConstructor(LazySimpleStructObjectInspector.class,
+            Properties.class, Configuration.class, ColumnMapping.class);
+  }
+
+  @Override
+  public LazyObjectBase createValueObject(ObjectInspector inspector) throws 
SerDeException {
+    try {
+      return (T) constructor.newInstance(inspector, properties, 
hbaseParams.getBaseConfiguration(),
+          hbaseParams.getColumnMappings().getColumnsMapping()[fieldID]);
+    } catch (Exception e) {
+      throw new SerDeException(e);
+    }
+  }
+}
\ No newline at end of file

Added: 
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java?rev=1628502&view=auto
==============================================================================
--- 
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
 (added)
+++ 
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
 Tue Sep 30 17:58:20 2014
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.hbase;
+
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.hbase.struct.HBaseStructValue;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * Test specific implementation of {@link 
org.apache.hadoop.hive.serde2.lazy.LazyStruct}
+ */
+public class HBaseTestStructSerializer extends HBaseStructValue {
+
+  protected byte[] bytes;
+  protected String bytesAsString;
+  protected Properties tbl;
+  protected Configuration conf;
+  protected ColumnMapping colMapping;
+  protected String testValue;
+
+  public HBaseTestStructSerializer(LazySimpleStructObjectInspector oi, 
Properties tbl,
+      Configuration conf, ColumnMapping colMapping) {
+    super(oi);
+    this.tbl = tbl;
+    this.conf = conf;
+    this.colMapping = colMapping;
+  }
+
+  @Override
+  public void init(ByteArrayRef bytes, int start, int length) {
+    this.bytes = bytes.getData();
+  }
+
+  @Override
+  public Object getField(int fieldID) {
+    if (bytesAsString == null) {
+      bytesAsString = Bytes.toString(bytes).trim();
+    }
+
+    // Randomly pick the character corresponding to the field id and convert 
it to byte array
+    byte[] fieldBytes = new byte[] { (byte) bytesAsString.charAt(fieldID) };
+
+    return toLazyObject(fieldID, fieldBytes);
+  }
+
+  /**
+   * Create an initialize a {@link LazyObject} with the given bytes for the 
given fieldID.
+   *
+   * @param fieldID field for which the object is to be created
+   * @param bytes value with which the object is to be initialized with
+   * 
+   * @return initialized {@link LazyObject}
+   * */
+  @Override
+  public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID, 
byte[] bytes) {
+    ObjectInspector fieldOI = 
oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+    LazyObject<? extends ObjectInspector> lazyObject = 
LazyFactory.createLazyObject(fieldOI);
+
+    ByteArrayRef ref = new ByteArrayRef();
+
+    ref.setData(bytes);
+
+    // initialize the lazy object
+    lazyObject.init(ref, 0, ref.getData().length);
+
+    return lazyObject;
+  }
+}
\ No newline at end of file

Modified: 
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
 (original)
+++ 
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
 Tue Sep 30 17:58:20 2014
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.avro.Schema;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hive.serde2.io.
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BooleanWritable;
@@ -135,6 +137,27 @@ public class TestHBaseSerDe extends Test
       "  ]\n" +
       "}";
 
+  private static final String EXPECTED_DESERIALIZED_AVRO_STRING =
+      
"{\"key\":\"test-row1\",\"cola_avro\":{\"arecord\":{\"int1\":42,\"boolean1\":true,"
+          + "\"long1\":42432234234}}}";
+
+  private static final String EXPECTED_DESERIALIZED_AVRO_STRING_2 =
+ "{\"key\":\"test-row1\","
+      + "\"cola_avro\":{\"employeename\":\"Avro Employee1\","
+      + "\"employeeid\":11111,\"age\":25,\"gender\":\"FEMALE\","
+      + "\"contactinfo\":{\"address\":[{\"address1\":\"Avro First 
Address1\",\"address2\":"
+      + "\"Avro Second Address1\",\"city\":\"Avro 
City1\",\"zipcode\":123456,\"county\":"
+      + 
"{0:{\"areacode\":999,\"number\":1234567890}},\"aliases\":null,\"metadata\":"
+      + "{\"testkey\":\"testvalue\"}},{\"address1\":\"Avro First 
Address1\",\"address2\":"
+      + "\"Avro Second Address1\",\"city\":\"Avro 
City1\",\"zipcode\":123456,\"county\":"
+      + 
"{0:{\"areacode\":999,\"number\":1234567890}},\"aliases\":null,\"metadata\":"
+      + 
"{\"testkey\":\"testvalue\"}}],\"homephone\":{\"areacode\":999,\"number\":1234567890},"
+      + "\"officephone\":{\"areacode\":999,\"number\":1234455555}}}}";
+
+  private static final String EXPECTED_DESERIALIZED_AVRO_STRING_3 =
+      
"{\"key\":\"test-row1\",\"cola_avro\":{\"arecord\":{\"int1\":42,\"string1\":\"test\","
+          + "\"boolean1\":true,\"long1\":42432234234}}}";
+
   /**
    * Test the default behavior of the Lazy family of objects and object 
inspectors.
    */
@@ -1047,7 +1070,8 @@ public class TestHBaseSerDe extends Test
     Properties tbl = createPropertiesForHiveAvroSchemaInline();
     serDe.initialize(conf, tbl);
 
-    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+        EXPECTED_DESERIALIZED_AVRO_STRING);
   }
 
   private Properties createPropertiesForHiveAvroSchemaInline() {
@@ -1092,7 +1116,8 @@ public class TestHBaseSerDe extends Test
     Properties tbl = createPropertiesForHiveAvroForwardEvolvedSchema();
     serDe.initialize(conf, tbl);
 
-    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+        EXPECTED_DESERIALIZED_AVRO_STRING_3);
   }
 
   private Properties createPropertiesForHiveAvroForwardEvolvedSchema() {
@@ -1136,7 +1161,8 @@ public class TestHBaseSerDe extends Test
     Properties tbl = createPropertiesForHiveAvroBackwardEvolvedSchema();
     serDe.initialize(conf, tbl);
 
-    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+        EXPECTED_DESERIALIZED_AVRO_STRING);
   }
 
   private Properties createPropertiesForHiveAvroBackwardEvolvedSchema() {
@@ -1185,7 +1211,8 @@ public class TestHBaseSerDe extends Test
     Properties tbl = createPropertiesForHiveAvroSerClass();
     serDe.initialize(conf, tbl);
 
-    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+        EXPECTED_DESERIALIZED_AVRO_STRING_2);
   }
 
   private Properties createPropertiesForHiveAvroSerClass() {
@@ -1243,7 +1270,8 @@ public class TestHBaseSerDe extends Test
       Properties tbl = createPropertiesForHiveAvroSchemaUrl(onHDFS);
       serDe.initialize(conf, tbl);
 
-      deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+      deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+          EXPECTED_DESERIALIZED_AVRO_STRING);
     } finally {
       // Teardown the cluster
       if (miniDfs != null) {
@@ -1298,7 +1326,8 @@ public class TestHBaseSerDe extends Test
     Properties tbl = createPropertiesForHiveAvroExternalSchema();
     serDe.initialize(conf, tbl);
 
-    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+    deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+        EXPECTED_DESERIALIZED_AVRO_STRING_2);
   }
 
   private Properties createPropertiesForHiveAvroExternalSchema() {
@@ -1389,8 +1418,87 @@ public class TestHBaseSerDe extends Test
     return tbl;
   }
 
+  public void testHBaseSerDeCustomStructValue() throws IOException, 
SerDeException {
+
+    byte[] cfa = "cola".getBytes();
+    byte[] qualStruct = "struct".getBytes();
+
+    TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0);
+    byte[] key = testStruct.getBytes();
+    // Data
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    byte[] testData = testStruct.getBytes();
+    kvs.add(new KeyValue(key, cfa, qualStruct, testData));
+
+    Result r = new Result(kvs);
+    byte[] putKey = testStruct.getBytesWithDelimiters();
+
+    Put p = new Put(putKey);
+
+    // Post serialization, separators are automatically inserted between 
different fields in the
+    // struct. Currently there is not way to disable that. So the work around 
here is to pad the
+    // data with the separator bytes before creating a "Put" object
+    p.add(new KeyValue(putKey, cfa, qualStruct, Bytes.padTail(testData, 2)));
+
+    // Create, initialize, and test the SerDe
+    HBaseSerDe serDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createPropertiesForValueStruct();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerializeHBaseValueStruct(serDe, r, p);
+
+  }
+
+  private Properties createPropertiesForValueStruct() {
+    Properties tbl = new Properties();
+    tbl.setProperty("cola.struct.serialization.type", "struct");
+    tbl.setProperty("cola.struct.test.value", "test value");
+    tbl.setProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS,
+        "org.apache.hadoop.hive.hbase.HBaseTestStructSerializer");
+    tbl.setProperty(serdeConstants.LIST_COLUMNS, "key,astring");
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+        
"struct<col1:string,col2:string,col3:string>,struct<col1:string,col2:string,col3:string>");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:struct");
+    tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS,
+        "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey");
+    return tbl;
+  }
+
+  private void deserializeAndSerializeHBaseValueStruct(HBaseSerDe serDe, 
Result r, Put p)
+      throws SerDeException, IOException {
+    StructObjectInspector soi = (StructObjectInspector) 
serDe.getObjectInspector();
+
+    List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+
+    Object row = serDe.deserialize(new ResultWritable(r));
+
+    Object fieldData = null;
+    for (int j = 0; j < fieldRefs.size(); j++) {
+      fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+      assertNotNull(fieldData);
+      if (fieldData instanceof LazyStruct) {
+        assertEquals(((LazyStruct) fieldData).getField(0).toString(), "A");
+        assertEquals(((LazyStruct) fieldData).getField(1).toString(), "B");
+        assertEquals(((LazyStruct) fieldData).getField(2).toString(), "C");
+      } else {
+        Assert.fail("fieldData should be an instance of LazyStruct");
+      }
+    }
+
+    assertEquals(
+        
"{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"}}",
+        SerDeUtils.getJSONString(row, soi));
+
+    // Now serialize
+    Put put = ((PutWritable) serDe.serialize(row, soi)).getPut();
+
+    assertEquals("Serialized put:", p.toString(), put.toString());
+  }
+
   private void deserializeAndSerializeHiveAvro(HBaseSerDe serDe, Result r, Put 
p,
-      Object[] expectedFieldsData)
+      Object[] expectedFieldsData, String expectedDeserializedAvroString)
       throws SerDeException, IOException {
     StructObjectInspector soi = (StructObjectInspector) 
serDe.getObjectInspector();
 
@@ -1403,6 +1511,8 @@ public class TestHBaseSerDe extends Test
       assertNotNull(fieldData);
       assertEquals(expectedFieldsData[j], fieldData.toString().trim());
     }
+    
+    assertEquals(expectedDeserializedAvroString, SerDeUtils.getJSONString(row, 
soi));
 
     // Now serialize
     Put put = ((PutWritable) serDe.serialize(row, soi)).getPut();


Reply via email to