Author: alfonsonishikawa Date: Tue May 7 23:19:03 2013 New Revision: 1480130
URL: http://svn.apache.org/r1480130 Log: GORA-207: Fixing some bugs. This is the last patch about GORA-207 (hopefully). Full support of unions (2 types unions, 3 types unions,...) : - Support of ["null",type] (a.k.a. optional field). - Support for mutitypes(3+) unions. - Support of nested unions. - Support of recursive optional records. - Support of unions as value in maps and arrays. - Serialization of topmost optional fields of the main record in "raw": topmost ["null","type"] (optional field) will be persisted like if it was ["type"] (and non-existant column === null). This ensures data form 0.2.1 can be read. Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1480130&r1=1480129&r2=1480130&view=diff ============================================================================== --- gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original) +++ gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Tue May 7 23:19:03 2013 @@ -200,6 +200,17 @@ implements Configurable { } } + /** + * {@inheritDoc} + * Serializes the Persistent data and saves in HBase. + * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens + * in maps and arrays too. + * + * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get + * deleted if value==null (so value read after will be null). + * + * @param persistent Record to be persisted in HBase + */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void put(K key, T persistent) { @@ -234,8 +245,14 @@ implements Configurable { case DIRTY: byte[] qual = Bytes.toBytes(mapKey.toString()); byte[] val = toBytes(map.get(mapKey), field.schema().getValueType()); - put.add(hcol.getFamily(), qual, val); - hasPuts = true; + // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete + if (val == null) { // value == null => must delete the column + delete.deleteColumn(hcol.getFamily(), qual); + hasDeletes = true; + } else { + put.add(hcol.getFamily(), qual, val); + hasPuts = true; + } break; case DELETED: qual = Bytes.toBytes(mapKey.toString()); @@ -248,9 +265,15 @@ implements Configurable { Set<Map.Entry> set = ((Map)o).entrySet(); for(Entry entry: set) { byte[] qual = toBytes(entry.getKey()); - byte[] val = toBytes(entry.getValue()); - put.add(hcol.getFamily(), qual, val); - hasPuts = true; + byte[] val = toBytes(entry.getValue(), field.schema().getValueType()); + // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete + if (val == null) { // value == null => must delete the column + delete.deleteColumn(hcol.getFamily(), qual); + hasDeletes = true; + } else { + put.add(hcol.getFamily(), qual, val); + hasPuts = true; + } } } break; @@ -259,15 +282,28 @@ implements Configurable { GenericArray arr = (GenericArray) o; int j=0; for(Object item : arr) { - byte[] val = toBytes(item); - put.add(hcol.getFamily(), Bytes.toBytes(j++), val); - hasPuts = true; + byte[] val = toBytes(item, field.schema().getElementType()); + // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete + if (val == null) { // value == null => must delete the column + delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++)); + hasDeletes = true; + } else { + put.add(hcol.getFamily(), Bytes.toBytes(j++), val); + hasPuts = true; + } } } break; default: - put.add(hcol.getFamily(), hcol.getQualifier(), toBytes(o, field.schema())); - hasPuts = true; + // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete + byte[] serializedBytes = toBytes(o, field.schema()) ; + if (serializedBytes == null) { // value == null => must delete the column + delete.deleteColumn(hcol.getFamily(), hcol.getQualifier()); + hasDeletes = true; + } else { + put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes); + hasPuts = true; + } break; } } @@ -563,8 +599,7 @@ implements Configurable { setField(persistent, field, arr); break; default: - byte[] val = - result.getValue(col.getFamily(), col.getQualifier()); + byte[] val = result.getValue(col.getFamily(), col.getQualifier()); if (val == null) { continue; } Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1480130&r1=1480129&r2=1480130&view=diff ============================================================================== --- gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original) +++ gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Tue May 7 23:19:03 2013 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; @@ -88,7 +89,17 @@ public class HBaseByteInterface { }; }; - + /** + * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or + * complex type (Persistent/Record). + * + * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type. + * + * @param schema Avro schema describing the expected data + * @param val array of bytes with the data serialized + * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null + * @throws IOException + */ @SuppressWarnings("rawtypes") public static Object fromBytes(Schema schema, byte[] val) throws IOException { Type type = schema.getType(); @@ -102,12 +113,43 @@ public class HBaseByteInterface { case DOUBLE: return Bytes.toDouble(val); case BOOLEAN: return val[0] != 0; case UNION: + // XXX Special case: When reading the top-level field of a record we must handle the + // special case ["null","type"] definitions: this will be written as if it was ["type"] + // if not in a special case, will execute "case RECORD". + + // if 'val' is empty we ignore the special case (will match Null in "case RECORD") + if (schema.getTypes().size() == 2) { + + // schema [type0, type1] + Type type0 = schema.getTypes().get(0).getType() ; + Type type1 = schema.getTypes().get(1).getType() ; + + // Check if types are different and there's a "null", like ["null","type"] or ["type","null"] + if (!type0.equals(type1) + && ( type0.equals(Schema.Type.NULL) + || type1.equals(Schema.Type.NULL))) { + + if (type0.equals(Schema.Type.NULL)) + schema = schema.getTypes().get(1) ; + else + schema = schema.getTypes().get(0) ; + + return fromBytes(schema, val) ; // Deserialize as if schema was ["type"] + } + + } + // else + // type = [type0,type1] where type0=type1 + // or val == null + // => deserialize like "case RECORD" + case RECORD: Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get(); PersistentDatumReader<?> reader = null ; - // For UNION schemas, must use a specific (UNION-type-type-type) - // since unions don't have own name + // For UNION schemas, must use a specific PersistentDatumReader + // from the readerMap since unions don't have own name + // (key name in map will be "UNION-type-type-...") if (schema.getType().equals(Schema.Type.UNION)) { reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode())); if (reader == null) { @@ -137,6 +179,13 @@ public class HBaseByteInterface { } } + /** + * Converts an array of bytes to the target <em>basic class</em>. + * @param clazz (Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8).class + * @param val array of bytes with the value + * @return an instance of <code>clazz</code> with the bytes in <code>val</code> + * deserialized with org.apache.hadoop.hbase.util.Bytes + */ @SuppressWarnings("unchecked") public static <K> K fromBytes(Class<K> clazz, byte[] val) { if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { @@ -161,6 +210,11 @@ public class HBaseByteInterface { throw new RuntimeException("Can't parse data as class: " + clazz); } + /** + * Converts an instance of a <em>basic class</em> to an array of bytes. + * @param o Instance of Enum|Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8 + * @return array of bytes with <code>o</code> serialized with org.apache.hadoop.hbase.util.Bytes + */ public static byte[] toBytes(Object o) { Class<?> clazz = o.getClass(); if (clazz.equals(Enum.class)) { @@ -187,6 +241,14 @@ public class HBaseByteInterface { throw new RuntimeException("Can't parse data as class: " + clazz); } + /** + * Serializes an object following the given schema. + * Does not handle <code>array/map</code> if it is not inside a <code>record</code> + * @param o Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Enum|Persistent + * @param schema The schema describing the object (or a compatible description) + * @return array of bytes of the serialized object + * @throws IOException + */ @SuppressWarnings({ "rawtypes", "unchecked" }) public static byte[] toBytes(Object o, Schema schema) throws IOException { Type type = schema.getType(); @@ -200,11 +262,40 @@ public class HBaseByteInterface { case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0}; case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() }; case UNION: + // XXX Special case: When writing the top-level field of a record we must handle the + // special case ["null","type"] definitions: this will be written as if it was ["type"] + // if not in a special case, will execute "case RECORD". + + if (schema.getTypes().size() == 2) { + + // schema [type0, type1] + Type type0 = schema.getTypes().get(0).getType() ; + Type type1 = schema.getTypes().get(1).getType() ; + + // Check if types are different and there's a "null", like ["null","type"] or ["type","null"] + if (!type0.equals(type1) + && ( type0.equals(Schema.Type.NULL) + || type1.equals(Schema.Type.NULL))) { + + if (o == null) return null ; + + int index = GenericData.get().resolveUnion(schema, o); + schema = schema.getTypes().get(index) ; + + return toBytes(o, schema) ; // Serialize as if schema was ["type"] + } + + } + // else + // type = [type0,type1] where type0=type1 + // => Serialize like "case RECORD" with Avro + case RECORD: Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get(); PersistentDatumWriter writer = null ; - // For UNION schemas, must use a specific (UNION-type-type-type) - // since unions don't have own name + // For UNION schemas, must use a specific PersistentDatumReader + // from the readerMap since unions don't have own name + // (key name in map will be "UNION-type-type-...") if (schema.getType().equals(Schema.Type.UNION)) { writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode())); if (writer == null) { Modified: gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java?rev=1480130&r1=1480129&r2=1480130&view=diff ============================================================================== --- gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java (original) +++ gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Tue May 7 23:19:03 2013 @@ -19,21 +19,29 @@ package org.apache.gora.hbase.store; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Properties; import junit.framework.Assert; +import org.apache.avro.util.Utf8; +import org.apache.commons.lang.ArrayUtils; +import org.apache.gora.examples.WebPageDataCreator; import org.apache.gora.examples.generated.Employee; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.hbase.GoraHBaseTestDriver; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.DataStoreTestBase; +import org.apache.gora.store.DataStoreTestUtil; +import org.apache.gora.util.GoraException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; /** * Test case for HBaseStore. @@ -94,21 +102,114 @@ public class TestHBaseStore extends Data } + /** + * Asserts that writing bytes actually works at low level in HBase. + * Checks writing null unions too. + */ @Override public void assertPutBytes(byte[] contentBytes) throws IOException { + // Check first the parameter "contentBytes" if written+read right. + HTable table = new HTable("WebPage"); + Get get = new Get(Bytes.toBytes("com.example/http")); + org.apache.hadoop.hbase.client.Result result = table.get(get); + + byte[] actualBytes = result.getValue(Bytes.toBytes("content"), null); + Assert.assertNotNull(actualBytes); + Assert.assertTrue(Arrays.equals(contentBytes, actualBytes)); + table.close(); + // Since "content" is an optional field, we are forced to reopen the DataStore // to retrieve the union correctly - webPageStore = testDriver.createDataStore(String.class, WebPage.class); + // Test writing+reading a null value. FIELD in HBASE MUST become DELETED WebPage page = webPageStore.get("com.example/http") ; - byte[] actualBytes = page.getContent().array() ; + page.setContent(null) ; + webPageStore.put("com.example/http", page) ; + webPageStore.close() ; + webPageStore = testDriver.createDataStore(String.class, WebPage.class); + page = webPageStore.get("com.example/http") ; + Assert.assertNull(page.getContent()) ; + // Check directly with HBase + table = new HTable("WebPage"); + get = new Get(Bytes.toBytes("com.example/http")); + result = table.get(get); + actualBytes = result.getValue(Bytes.toBytes("content"), null); + Assert.assertNull(actualBytes); + table.close(); + // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0]) + page = webPageStore.get("com.example/http") ; + page.setContent(ByteBuffer.wrap("".getBytes())) ; + webPageStore.put("com.example/http", page) ; webPageStore.close() ; - + webPageStore = testDriver.createDataStore(String.class, WebPage.class); + page = webPageStore.get("com.example/http") ; + Assert.assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ; + // Check directly with HBase + table = new HTable("WebPage"); + get = new Get(Bytes.toBytes("com.example/http")); + result = table.get(get); + actualBytes = result.getValue(Bytes.toBytes("content"), null); Assert.assertNotNull(actualBytes); - Assert.assertTrue(Arrays.equals(contentBytes, actualBytes)); + Assert.assertEquals(0, actualBytes.length) ; + table.close(); + + } + + /** + * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format + * @throws Exception + */ + @Test + public void assertTopLevelUnions() throws Exception { + WebPage page = webPageStore.newPersistent(); + + // Write webpage data + page.setUrl(new Utf8("http://example.com")); + byte[] contentBytes = "example content in example.com".getBytes(); + ByteBuffer buff = ByteBuffer.wrap(contentBytes); + page.setContent(buff); + webPageStore.put("com.example/http", page); + webPageStore.flush() ; + + // Read directly from HBase + HTable table = new HTable("WebPage"); + Get get = new Get(Bytes.toBytes("com.example/http")); + org.apache.hadoop.hbase.client.Result result = table.get(get); + + byte[] bytesRead = result.getValue(Bytes.toBytes("content"), null); + + Assert.assertNotNull(bytesRead) ; + Assert.assertTrue(Arrays.equals(bytesRead, contentBytes)); + } + + /** + * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code> + * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized + * with Avro. + * @throws Exception + */ + @Test + public void assertTopLevelUnionsNull() throws Exception { + WebPage page = webPageStore.newPersistent(); + + // Write webpage data + page.setUrl(new Utf8("http://example.com")); + page.setContent(null); // This won't change internal field status to dirty, so + page.setDirty("content") ; // need to change it manually + webPageStore.put("com.example/http", page); + webPageStore.flush() ; + + // Read directly from HBase + HTable table = new HTable("WebPage"); + Get get = new Get(Bytes.toBytes("com.example/http")); + org.apache.hadoop.hbase.client.Result result = table.get(get); + + byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null); + Assert.assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ; + Assert.assertTrue(contentBytes == null || contentBytes.length == 0) ; } @Override
