Author: brock
Date: Tue Sep 30 17:58:20 2014
New Revision: 1628502
URL: http://svn.apache.org/r1628502
Log:
HIVE-6148 - Support arbitrary structs stored in HBase (Swarnim Kulkarni via
Brock)
Added:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
Tue Sep 30 17:58:20 2014
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.hbase;
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
@@ -26,9 +29,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import java.io.IOException;
-import java.util.Properties;
-
public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements
HBaseKeyFactory {
protected LazySimpleSerDe.SerDeParameters serdeParams;
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
Tue Sep 30 17:58:20 2014
@@ -53,6 +53,7 @@ public class HBaseSerDe extends Abstract
public static final String HBASE_COMPOSITE_KEY_CLASS =
"hbase.composite.key.class";
public static final String HBASE_COMPOSITE_KEY_TYPES =
"hbase.composite.key.types";
public static final String HBASE_COMPOSITE_KEY_FACTORY =
"hbase.composite.key.factory";
+ public static final String HBASE_STRUCT_SERIALIZER_CLASS =
"hbase.struct.serialization.class";
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
@@ -98,7 +99,7 @@ public class HBaseSerDe extends Abstract
cachedHBaseRow = new LazyHBaseRow(
(LazySimpleStructObjectInspector) cachedObjectInspector,
- serdeParams.getKeyIndex(), serdeParams.getKeyFactory());
+ serdeParams.getKeyIndex(), serdeParams.getKeyFactory(),
serdeParams.getValueFactories());
serializer = new HBaseRowSerializer(serdeParams);
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeHelper.java
Tue Sep 30 17:58:20 2014
@@ -41,6 +41,10 @@ import org.apache.hadoop.hive.serde.serd
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.util.StringUtils;
@@ -371,6 +375,19 @@ public class HBaseSerDeHelper {
}
/**
+ * Create the {@link LazyObjectBase lazy field}
+ * */
+ public static LazyObjectBase createLazyField(ColumnMapping[] columnMappings,
int fieldID,
+ ObjectInspector inspector) {
+ ColumnMapping colMap = columnMappings[fieldID];
+ if (colMap.getQualifierName() == null && !colMap.isHbaseRowKey()) {
+ // a column family
+ return new LazyHBaseCellMap((LazyMapObjectInspector) inspector);
+ }
+ return LazyFactory.createLazyObject(inspector,
colMap.getBinaryStorage().get(0));
+ }
+
+ /**
* Auto-generates the key struct for composite keys
*
* @param compositeKeyParts map of composite key part name to its type.
Usually this would be
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
Tue Sep 30 17:58:20 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.hbase.Colu
import org.apache.hadoop.hive.hbase.struct.AvroHBaseValueFactory;
import org.apache.hadoop.hive.hbase.struct.DefaultHBaseValueFactory;
import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
+import org.apache.hadoop.hive.hbase.struct.StructHBaseValueFactory;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -204,11 +205,21 @@ public class HBaseSerDeParameters {
for (int i = 0; i < columnMappings.size(); i++) {
String serType = getSerializationType(conf, tbl,
columnMappings.getColumnsMapping()[i]);
- if (serType != null && serType.equals(AVRO_SERIALIZATION_TYPE)) {
+ if (AVRO_SERIALIZATION_TYPE.equals(serType)) {
Schema schema = getSchema(conf, tbl,
columnMappings.getColumnsMapping()[i]);
- valueFactories.add(new AvroHBaseValueFactory(schema));
+ valueFactories.add(new AvroHBaseValueFactory(i, schema));
+ } else if (STRUCT_SERIALIZATION_TYPE.equals(serType)) {
+ String structValueClassName =
tbl.getProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS);
+
+ if (structValueClassName == null) {
+ throw new
IllegalArgumentException(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS
+ + " must be set for hbase columns of type [" +
STRUCT_SERIALIZATION_TYPE + "]");
+ }
+
+ Class<?> structValueClass = job.getClassByName(structValueClassName);
+ valueFactories.add(new StructHBaseValueFactory(i, structValueClass));
} else {
- valueFactories.add(new DefaultHBaseValueFactory());
+ valueFactories.add(new DefaultHBaseValueFactory(i));
}
}
} catch (Exception e) {
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
Tue Sep 30 17:58:20 2014
@@ -20,15 +20,15 @@ package org.apache.hadoop.hive.hbase;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
-import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
-import
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
import
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -47,18 +47,21 @@ public class LazyHBaseRow extends LazySt
private final int iKey;
private final HBaseKeyFactory keyFactory;
+ private final List<HBaseValueFactory> valueFactories;
public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
- this(oi, -1, null);
+ this(oi, -1, null, null);
}
/**
* Construct a LazyHBaseRow object with the ObjectInspector.
*/
- public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey,
HBaseKeyFactory keyFactory) {
+ public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey,
HBaseKeyFactory keyFactory,
+ List<HBaseValueFactory> valueFactories) {
super(oi);
this.iKey = iKey;
this.keyFactory = keyFactory;
+ this.valueFactories = valueFactories;
}
/**
@@ -76,13 +79,14 @@ public class LazyHBaseRow extends LazySt
if (fieldID == iKey) {
return keyFactory.createKey(fieldRef.getFieldObjectInspector());
}
- ColumnMapping colMap = columnsMapping[fieldID];
- if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
- // a column family
- return new LazyHBaseCellMap((LazyMapObjectInspector)
fieldRef.getFieldObjectInspector());
+
+ if (valueFactories != null) {
+ return
valueFactories.get(fieldID).createValueObject(fieldRef.getFieldObjectInspector());
}
- return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(),
- colMap.binaryStorage.get(0));
+
+ // fallback to default
+ return HBaseSerDeHelper.createLazyField(columnsMapping, fieldID,
+ fieldRef.getFieldObjectInspector());
}
/**
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/AvroHBaseValueFactory.java
Tue Sep 30 17:58:20 2014
@@ -48,7 +48,8 @@ public class AvroHBaseValueFactory exten
*
* @param schema the associated {@link Schema schema}
* */
- public AvroHBaseValueFactory(Schema schema) {
+ public AvroHBaseValueFactory(int fieldID, Schema schema) {
+ super(fieldID);
this.schema = schema;
}
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/DefaultHBaseValueFactory.java
Tue Sep 30 17:58:20 2014
@@ -21,9 +21,12 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.ColumnMappings;
+import org.apache.hadoop.hive.hbase.HBaseSerDeHelper;
import org.apache.hadoop.hive.hbase.HBaseSerDeParameters;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -35,15 +38,23 @@ import org.apache.hadoop.hive.serde2.typ
public class DefaultHBaseValueFactory implements HBaseValueFactory{
protected LazySimpleSerDe.SerDeParameters serdeParams;
+ protected ColumnMappings columnMappings;
protected HBaseSerDeParameters hbaseParams;
protected Properties properties;
protected Configuration conf;
+ private int fieldID;
+
+ public DefaultHBaseValueFactory(int fieldID) {
+ this.fieldID = fieldID;
+ }
+
@Override
public void init(HBaseSerDeParameters hbaseParams, Configuration conf,
Properties properties)
throws SerDeException {
this.hbaseParams = hbaseParams;
this.serdeParams = hbaseParams.getSerdeParams();
+ this.columnMappings = hbaseParams.getColumnMappings();
this.properties = properties;
this.conf = conf;
}
@@ -55,6 +66,11 @@ public class DefaultHBaseValueFactory im
1, serdeParams.getNullSequence(), serdeParams.isEscaped(),
serdeParams.getEscapeChar());
}
+ @Override
+ public LazyObjectBase createValueObject(ObjectInspector inspector) throws
SerDeException {
+ return
HBaseSerDeHelper.createLazyField(columnMappings.getColumnsMapping(), fieldID,
inspector);
+ }
+
@Override
public byte[] serializeValue(Object object, StructField field)
throws IOException {
Added:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java?rev=1628502&view=auto
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
(added)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseStructValue.java
Tue Sep 30 17:58:20 2014
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.hbase.struct;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * This is an extension of LazyStruct. All value structs should extend this
class and override the
+ * {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID
of a value in the
+ * value structure.
+ * <p>
+ * For example, for a value structure <i>"/part1/part2/part3"</i>,
<i>part1</i> will have an id
+ * <i>0</i>, <i>part2</i> will have an id <i>1</i> and <i>part3</i> will have
an id <i>2</i>. Custom
+ * implementations of getField(fieldID) should return the value corresponding
to that fieldID. So,
+ * for the above example, the value returned for <i>getField(0)</i> should be
</i>part1</i>,
+ * <i>getField(1)</i> should be <i>part2</i> and <i>getField(2)</i> should be
<i>part3</i>.
+ * </p>
+ * <p>
+ * All implementation are expected to have a constructor of the form <br>
+ *
+ * <pre>
+ * MyCustomStructObject(LazySimpleStructObjectInspector oi, Properties props,
Configuration conf, ColumnMapping colMap)
+ * </pre>
+ *
+ * </p>
+ * */
+public class HBaseStructValue extends LazyStruct {
+
+ /**
+ * The column family name
+ */
+ protected String familyName;
+
+ /**
+ * The column qualifier name
+ */
+ protected String qualifierName;
+
+ public HBaseStructValue(LazySimpleStructObjectInspector oi) {
+ super(oi);
+ }
+
+ /**
+ * Set the row data for this LazyStruct.
+ *
+ * @see LazyObject#init(ByteArrayRef, int, int)
+ *
+ * @param familyName The column family name
+ * @param qualifierName The column qualifier name
+ */
+ public void init(ByteArrayRef bytes, int start, int length, String
familyName,
+ String qualifierName) {
+ init(bytes, start, length);
+ this.familyName = familyName;
+ this.qualifierName = qualifierName;
+ }
+
+ @Override
+ public ArrayList<Object> getFieldsAsList() {
+ ArrayList<Object> allFields = new ArrayList<Object>();
+
+ List<? extends StructField> fields = oi.getAllStructFieldRefs();
+
+ for (int i = 0; i < fields.size(); i++) {
+ allFields.add(getField(i));
+ }
+
+ return allFields;
+ }
+
+ /**
+ * Create an initialize a {@link LazyObject} with the given bytes for the
given fieldID.
+ *
+ * @param fieldID field for which the object is to be created
+ * @param bytes value with which the object is to be initialized with
+ * @return initialized {@link LazyObject}
+ * */
+ public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID,
byte[] bytes) {
+ ObjectInspector fieldOI =
oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+ LazyObject<? extends ObjectInspector> lazyObject =
LazyFactory.createLazyObject(fieldOI);
+
+ ByteArrayRef ref = new ByteArrayRef();
+
+ ref.setData(bytes);
+
+ // initialize the lazy object
+ lazyObject.init(ref, 0, ref.getData().length);
+
+ return lazyObject;
+ }
+}
\ No newline at end of file
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/HBaseValueFactory.java
Tue Sep 30 17:58:20 2014
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.HBaseKeyFactory;
import org.apache.hadoop.hive.hbase.HBaseSerDeParameters;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -53,6 +55,13 @@ public interface HBaseValueFactory {
ObjectInspector createValueObjectInspector(TypeInfo type) throws
SerDeException;
/**
+ * create custom object for hbase value
+ *
+ * @param inspector OI create by {@link
HBaseKeyFactory#createKeyObjectInspector}
+ */
+ LazyObjectBase createValueObject(ObjectInspector inspector) throws
SerDeException;
+
+ /**
* Serialize the given hive object
*
* @param object the object to be serialized
Added:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java?rev=1628502&view=auto
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
(added)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/StructHBaseValueFactory.java
Tue Sep 30 17:58:20 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.hbase.struct;
+
+import java.lang.reflect.Constructor;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * Implementation of {@link HBaseValueFactory} to consume a custom struct
+ * */
+public class StructHBaseValueFactory<T extends HBaseStructValue> extends
DefaultHBaseValueFactory {
+
+ private final int fieldID;
+ private final Constructor constructor;
+
+ public StructHBaseValueFactory(int fieldID, Class<?> structValueClass)
throws Exception {
+ super(fieldID);
+ this.fieldID = fieldID;
+ this.constructor =
+
structValueClass.getDeclaredConstructor(LazySimpleStructObjectInspector.class,
+ Properties.class, Configuration.class, ColumnMapping.class);
+ }
+
+ @Override
+ public LazyObjectBase createValueObject(ObjectInspector inspector) throws
SerDeException {
+ try {
+ return (T) constructor.newInstance(inspector, properties,
hbaseParams.getBaseConfiguration(),
+ hbaseParams.getColumnMappings().getColumnsMapping()[fieldID]);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+}
\ No newline at end of file
Added:
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java?rev=1628502&view=auto
==============================================================================
---
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
(added)
+++
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestStructSerializer.java
Tue Sep 30 17:58:20 2014
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.hbase;
+
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.hbase.struct.HBaseStructValue;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * Test specific implementation of {@link
org.apache.hadoop.hive.serde2.lazy.LazyStruct}
+ */
+public class HBaseTestStructSerializer extends HBaseStructValue {
+
+ protected byte[] bytes;
+ protected String bytesAsString;
+ protected Properties tbl;
+ protected Configuration conf;
+ protected ColumnMapping colMapping;
+ protected String testValue;
+
+ public HBaseTestStructSerializer(LazySimpleStructObjectInspector oi,
Properties tbl,
+ Configuration conf, ColumnMapping colMapping) {
+ super(oi);
+ this.tbl = tbl;
+ this.conf = conf;
+ this.colMapping = colMapping;
+ }
+
+ @Override
+ public void init(ByteArrayRef bytes, int start, int length) {
+ this.bytes = bytes.getData();
+ }
+
+ @Override
+ public Object getField(int fieldID) {
+ if (bytesAsString == null) {
+ bytesAsString = Bytes.toString(bytes).trim();
+ }
+
+ // Randomly pick the character corresponding to the field id and convert
it to byte array
+ byte[] fieldBytes = new byte[] { (byte) bytesAsString.charAt(fieldID) };
+
+ return toLazyObject(fieldID, fieldBytes);
+ }
+
+ /**
+ * Create an initialize a {@link LazyObject} with the given bytes for the
given fieldID.
+ *
+ * @param fieldID field for which the object is to be created
+ * @param bytes value with which the object is to be initialized with
+ *
+ * @return initialized {@link LazyObject}
+ * */
+ @Override
+ public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID,
byte[] bytes) {
+ ObjectInspector fieldOI =
oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+ LazyObject<? extends ObjectInspector> lazyObject =
LazyFactory.createLazyObject(fieldOI);
+
+ ByteArrayRef ref = new ByteArrayRef();
+
+ ref.setData(bytes);
+
+ // initialize the lazy object
+ lazyObject.init(ref, 0, ref.getData().length);
+
+ return lazyObject;
+ }
+}
\ No newline at end of file
Modified:
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1628502&r1=1628501&r2=1628502&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
(original)
+++
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
Tue Sep 30 17:58:20 2014
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.avro.Schema;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hive.serde2.io.
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BooleanWritable;
@@ -135,6 +137,27 @@ public class TestHBaseSerDe extends Test
" ]\n" +
"}";
+ private static final String EXPECTED_DESERIALIZED_AVRO_STRING =
+
"{\"key\":\"test-row1\",\"cola_avro\":{\"arecord\":{\"int1\":42,\"boolean1\":true,"
+ + "\"long1\":42432234234}}}";
+
+ private static final String EXPECTED_DESERIALIZED_AVRO_STRING_2 =
+ "{\"key\":\"test-row1\","
+ + "\"cola_avro\":{\"employeename\":\"Avro Employee1\","
+ + "\"employeeid\":11111,\"age\":25,\"gender\":\"FEMALE\","
+ + "\"contactinfo\":{\"address\":[{\"address1\":\"Avro First
Address1\",\"address2\":"
+ + "\"Avro Second Address1\",\"city\":\"Avro
City1\",\"zipcode\":123456,\"county\":"
+ +
"{0:{\"areacode\":999,\"number\":1234567890}},\"aliases\":null,\"metadata\":"
+ + "{\"testkey\":\"testvalue\"}},{\"address1\":\"Avro First
Address1\",\"address2\":"
+ + "\"Avro Second Address1\",\"city\":\"Avro
City1\",\"zipcode\":123456,\"county\":"
+ +
"{0:{\"areacode\":999,\"number\":1234567890}},\"aliases\":null,\"metadata\":"
+ +
"{\"testkey\":\"testvalue\"}}],\"homephone\":{\"areacode\":999,\"number\":1234567890},"
+ + "\"officephone\":{\"areacode\":999,\"number\":1234455555}}}}";
+
+ private static final String EXPECTED_DESERIALIZED_AVRO_STRING_3 =
+
"{\"key\":\"test-row1\",\"cola_avro\":{\"arecord\":{\"int1\":42,\"string1\":\"test\","
+ + "\"boolean1\":true,\"long1\":42432234234}}}";
+
/**
* Test the default behavior of the Lazy family of objects and object
inspectors.
*/
@@ -1047,7 +1070,8 @@ public class TestHBaseSerDe extends Test
Properties tbl = createPropertiesForHiveAvroSchemaInline();
serDe.initialize(conf, tbl);
- deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+ deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+ EXPECTED_DESERIALIZED_AVRO_STRING);
}
private Properties createPropertiesForHiveAvroSchemaInline() {
@@ -1092,7 +1116,8 @@ public class TestHBaseSerDe extends Test
Properties tbl = createPropertiesForHiveAvroForwardEvolvedSchema();
serDe.initialize(conf, tbl);
- deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+ deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+ EXPECTED_DESERIALIZED_AVRO_STRING_3);
}
private Properties createPropertiesForHiveAvroForwardEvolvedSchema() {
@@ -1136,7 +1161,8 @@ public class TestHBaseSerDe extends Test
Properties tbl = createPropertiesForHiveAvroBackwardEvolvedSchema();
serDe.initialize(conf, tbl);
- deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+ deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+ EXPECTED_DESERIALIZED_AVRO_STRING);
}
private Properties createPropertiesForHiveAvroBackwardEvolvedSchema() {
@@ -1185,7 +1211,8 @@ public class TestHBaseSerDe extends Test
Properties tbl = createPropertiesForHiveAvroSerClass();
serDe.initialize(conf, tbl);
- deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+ deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+ EXPECTED_DESERIALIZED_AVRO_STRING_2);
}
private Properties createPropertiesForHiveAvroSerClass() {
@@ -1243,7 +1270,8 @@ public class TestHBaseSerDe extends Test
Properties tbl = createPropertiesForHiveAvroSchemaUrl(onHDFS);
serDe.initialize(conf, tbl);
- deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+ deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+ EXPECTED_DESERIALIZED_AVRO_STRING);
} finally {
// Teardown the cluster
if (miniDfs != null) {
@@ -1298,7 +1326,8 @@ public class TestHBaseSerDe extends Test
Properties tbl = createPropertiesForHiveAvroExternalSchema();
serDe.initialize(conf, tbl);
- deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData);
+ deserializeAndSerializeHiveAvro(serDe, r, p, expectedFieldsData,
+ EXPECTED_DESERIALIZED_AVRO_STRING_2);
}
private Properties createPropertiesForHiveAvroExternalSchema() {
@@ -1389,8 +1418,87 @@ public class TestHBaseSerDe extends Test
return tbl;
}
+ public void testHBaseSerDeCustomStructValue() throws IOException,
SerDeException {
+
+ byte[] cfa = "cola".getBytes();
+ byte[] qualStruct = "struct".getBytes();
+
+ TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0);
+ byte[] key = testStruct.getBytes();
+ // Data
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ byte[] testData = testStruct.getBytes();
+ kvs.add(new KeyValue(key, cfa, qualStruct, testData));
+
+ Result r = new Result(kvs);
+ byte[] putKey = testStruct.getBytesWithDelimiters();
+
+ Put p = new Put(putKey);
+
+ // Post serialization, separators are automatically inserted between
different fields in the
+ // struct. Currently there is not way to disable that. So the work around
here is to pad the
+ // data with the separator bytes before creating a "Put" object
+ p.add(new KeyValue(putKey, cfa, qualStruct, Bytes.padTail(testData, 2)));
+
+ // Create, initialize, and test the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesForValueStruct();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHBaseValueStruct(serDe, r, p);
+
+ }
+
+ private Properties createPropertiesForValueStruct() {
+ Properties tbl = new Properties();
+ tbl.setProperty("cola.struct.serialization.type", "struct");
+ tbl.setProperty("cola.struct.test.value", "test value");
+ tbl.setProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS,
+ "org.apache.hadoop.hive.hbase.HBaseTestStructSerializer");
+ tbl.setProperty(serdeConstants.LIST_COLUMNS, "key,astring");
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+
"struct<col1:string,col2:string,col3:string>,struct<col1:string,col2:string,col3:string>");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, ":key,cola:struct");
+ tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS,
+ "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey");
+ return tbl;
+ }
+
+ private void deserializeAndSerializeHBaseValueStruct(HBaseSerDe serDe,
Result r, Put p)
+ throws SerDeException, IOException {
+ StructObjectInspector soi = (StructObjectInspector)
serDe.getObjectInspector();
+
+ List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+
+ Object row = serDe.deserialize(new ResultWritable(r));
+
+ Object fieldData = null;
+ for (int j = 0; j < fieldRefs.size(); j++) {
+ fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+ assertNotNull(fieldData);
+ if (fieldData instanceof LazyStruct) {
+ assertEquals(((LazyStruct) fieldData).getField(0).toString(), "A");
+ assertEquals(((LazyStruct) fieldData).getField(1).toString(), "B");
+ assertEquals(((LazyStruct) fieldData).getField(2).toString(), "C");
+ } else {
+ Assert.fail("fieldData should be an instance of LazyStruct");
+ }
+ }
+
+ assertEquals(
+
"{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"}}",
+ SerDeUtils.getJSONString(row, soi));
+
+ // Now serialize
+ Put put = ((PutWritable) serDe.serialize(row, soi)).getPut();
+
+ assertEquals("Serialized put:", p.toString(), put.toString());
+ }
+
private void deserializeAndSerializeHiveAvro(HBaseSerDe serDe, Result r, Put
p,
- Object[] expectedFieldsData)
+ Object[] expectedFieldsData, String expectedDeserializedAvroString)
throws SerDeException, IOException {
StructObjectInspector soi = (StructObjectInspector)
serDe.getObjectInspector();
@@ -1403,6 +1511,8 @@ public class TestHBaseSerDe extends Test
assertNotNull(fieldData);
assertEquals(expectedFieldsData[j], fieldData.toString().trim());
}
+
+ assertEquals(expectedDeserializedAvroString, SerDeUtils.getJSONString(row,
soi));
// Now serialize
Put put = ((PutWritable) serDe.serialize(row, soi)).getPut();