Author: lewismc
Date: Wed Feb 26 17:27:44 2014
New Revision: 1572170
URL: http://svn.apache.org/r1572170
Log:
GORA-94v11 and GORA-246v5
Modified:
gora/branches/GORA_94/gora-core/src/examples/avro/employee.json
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml
Modified: gora/branches/GORA_94/gora-core/src/examples/avro/employee.json
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/examples/avro/employee.json?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/examples/avro/employee.json (original)
+++ gora/branches/GORA_94/gora-core/src/examples/avro/employee.json Wed Feb 26
17:27:44 2014
@@ -17,7 +17,8 @@
{"name": "url", "type": ["null","string"], "default":null},
{"name": "content", "type": ["null","bytes"],"default":null},
{"name": "parsedContent", "type": {"type":"array", "items":
"string"},"default":{}},
- {"name": "outlinks", "type": {"type":"map",
"values":"string"},"default":{}},
+ {"name": "outlinks", "type": {"type":"map", "values":["null",
"string"]},"default":{}},
+ {"name": "headers", "type": ["null", {"type": "map", "values":
["null", "string"]}],"default":null},
{"name": "metadata", "default":null, "type": {
"name": "Metadata",
"type": "record",
Modified:
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
(original)
+++
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
Wed Feb 26 17:27:44 2014
@@ -210,7 +210,11 @@ public abstract class DataStoreTestBase
public void testUpdate() throws IOException, Exception {
log.info("test method: testUpdate");
DataStoreTestUtil.testUpdateEmployee(employeeStore);
- DataStoreTestUtil.testUpdateWebPage(webPageStore);
+ DataStoreTestUtil.testUpdateWebPagePutToArray(webPageStore);
+ DataStoreTestUtil.testUpdateWebPagePutToNotNullableMap(webPageStore);
+ DataStoreTestUtil.testUpdateWebPagePutToNullableMap(webPageStore);
+ DataStoreTestUtil.testUpdateWebPageRemoveMapEntry(webPageStore);
+ DataStoreTestUtil.testUpdateWebPageRemoveField(webPageStore);
}
public void testEmptyUpdate() throws IOException, Exception {
Modified:
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
(original)
+++
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
Wed Feb 26 17:27:44 2014
@@ -232,7 +232,7 @@ public class DataStoreTestUtil {
Employee employee = DataStoreTestUtil.createEmployee(dataStore);
employee.setBoss(new Utf8("Real boss")) ;
-
+
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
@@ -313,7 +313,7 @@ public class DataStoreTestUtil {
if (employee.getBoss() instanceof Utf8) {
String beforeBoss = employee.getBoss().toString();
String afterBoss = after.getBoss().toString();
- assertEquals("Boss String field values in UNION should be the same",
+ assertEquals("Boss String field values in UNION should be the same",
beforeBoss, afterBoss);
} else {
Employee beforeBoss = (Employee) employee.getBoss();
@@ -480,15 +480,14 @@ public class DataStoreTestUtil {
* @throws IOException
* @throws Exception
*/
- public static void testUpdateWebPage(DataStore<String, WebPage> dataStore)
+ public static void testUpdateWebPagePutToArray(DataStore<String, WebPage>
dataStore)
throws IOException, Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
- "http://d.com/d", "http://e.com/e", "http://f.com/f",
"http://g.com/g"};
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"
};
String content = "content";
String parsedContent = "parsedContent";
- String anchor = "anchor";
int parsedContentCount = 0;
@@ -496,13 +495,9 @@ public class DataStoreTestUtil {
for (int i = 0; i < urls.length; i++) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(urls[i]));
- webPage.setParsedContent(new ArrayList<CharSequence>());
for (parsedContentCount = 0; parsedContentCount < 5;
parsedContentCount++) {
webPage.getParsedContent().add(new Utf8(parsedContent + i + "," +
parsedContentCount));
}
- for (int j = 0; j < urls.length; j += 2) {
- webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
- }
dataStore.put(webPage.getUrl().toString(), webPage);
}
@@ -514,11 +509,49 @@ public class DataStoreTestUtil {
for (parsedContentCount = 5; parsedContentCount < 10;
parsedContentCount++) {
webPage.getParsedContent().add(new Utf8(parsedContent + i + "," +
parsedContentCount));
}
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ assertEquals(content + i, ByteUtils.toString(
toByteArray(webPage.getContent()) ));
+ assertEquals(10, webPage.getParsedContent().size());
+ int j = 0;
+ for (CharSequence pc : webPage.getParsedContent()) {
+ assertEquals(parsedContent + i + "," + j, pc.toString());
+ j++;
+ }
+ }
+ }
+
+ public static void testUpdateWebPagePutToNotNullableMap(DataStore<String,
WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"
};
+ String anchor = "anchor";
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
+ for (int j = 0; j < urls.length; j += 2) {
+ webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
webPage.getOutlinks().clear();
for (int j = 1; j < urls.length; j += 2) {
webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
}
- //test for double put of same entries
+ // test for double put of same entries
for (int j = 1; j < urls.length; j += 2) {
webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
}
@@ -529,15 +562,9 @@ public class DataStoreTestUtil {
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
- assertEquals(content + i, ByteUtils.toString(
toByteArray(webPage.getContent()) ));
- assertEquals(10, webPage.getParsedContent().size());
int j = 0;
- for (CharSequence pc : webPage.getParsedContent()) {
- assertEquals(parsedContent + i + "," + j, pc.toString());
- j++;
- }
int count = 0;
- for (j = 0; j < urls.length; j++) {
+ for (j = 0; j < urls.length; j++) { //TODO j++ or j+=2 ?
CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
assertNotNull(link);
assertEquals(urls[j], link.toString());
@@ -545,11 +572,36 @@ public class DataStoreTestUtil {
}
assertEquals(count, webPage.getOutlinks().size());
}
+ }
+
+ public static void testUpdateWebPagePutToNullableMap(DataStore<String,
WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"
};
+ String header = "header";
+ String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+ "fourthHeader", "fifthHeader", "sixthHeader" };
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
+ //test put for nullable map field
+ webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+ for (int j = 0; j < headers.length; j += 2) {
+ webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
- for (int j = 0; j < urls.length; j += 2) {
- webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ webPage.getHeaders().clear(); //TODO clear method does not work
+ for (int j = 1; j < headers.length; j += 2) {
+ webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
@@ -558,13 +610,95 @@ public class DataStoreTestUtil {
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
+ int j = 0;
int count = 0;
+ for (j = 0; j < headers.length; j++) { //TODO j++ or j+=2 ?
+ CharSequence headerSample = webPage.getHeaders().get(new Utf8(header +
j));
+ assertNotNull(headerSample);
+ assertEquals(headers[j], headerSample.toString());
+ count++;
+ }
+ assertEquals(count, webPage.getHeaders().size());
+ }
+ }
+
+ public static void testUpdateWebPageRemoveMapEntry(DataStore<String,
WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"
};
+ String anchor = "anchor";
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
for (int j = 0; j < urls.length; j++) {
+ webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ // map entry removal test
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ for (int j = 1; j < urls.length; j += 2) {
+ webPage.getOutlinks().put(new Utf8(anchor + j), null);
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ int count = 0;
+ WebPage webPage = dataStore.get(urls[i]);
+ for (int j = 1; j < urls.length; j += 2) {
CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
- assertNotNull(link);
- assertEquals(urls[j], link.toString());
+ assertNull(link);
count++;
}
+ assertEquals(urls.length - count, webPage.getOutlinks().size());
+ }
+ }
+
+ public static void testUpdateWebPageRemoveField(DataStore<String, WebPage>
dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"
};
+ String anchor = "anchor";
+ String header = "header";
+ String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+ "fourthHeader", "fifthHeader", "sixthHeader" };
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
+ webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+ for (int j = 0; j < headers.length; j++) {
+ webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ // nullable map field removal test
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ webPage.setHeaders(null);
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ assertNull(webPage.getHeaders());
}
}
Modified:
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
(original)
+++
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
Wed Feb 26 17:27:44 2014
@@ -224,97 +224,110 @@ implements Configurable {
}
/**
- * {@inheritDoc}
- * Serializes the Persistent data and saves in HBase.
- * Topmost fields of the record are persisted in "raw" format (not avro
serialized). This behavior happens
- * in maps and arrays too.
+ * {@inheritDoc} Serializes the Persistent data and saves in HBase. Topmost
+ * fields of the record are persisted in "raw" format (not avro serialized).
+ * This behavior happens in maps and arrays too.
*
- * ["null","type"] type (a.k.a. optional field) is persisted like as if it
is ["type"], but the column get
- * deleted if value==null (so value read after will be null).
+ * ["null","type"] type (a.k.a. optional field) is persisted like as if it is
+ * ["type"], but the column get deleted if value==null (so value read after
+ * will be null).
*
- * @param persistent Record to be persisted in HBase
+ * @param persistent
+ * Record to be persisted in HBase
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void put(K key, T persistent) {
- try{
+ try {
Schema schema = persistent.getSchema();
byte[] keyRaw = toBytes(key);
Put put = new Put(keyRaw);
Delete delete = new Delete(keyRaw);
- boolean hasPuts = false;
- boolean hasDeletes = false;
List<Field> fields = schema.getFields();
- for (int i = 1; i<fields.size(); i++) {
+ for (int i = 1; i < fields.size(); i++) {
if (!persistent.isDirty(i)) {
continue;
}
Field field = fields.get(i);
- Type type = field.schema().getType();
Object o = persistent.get(i);
HBaseColumn hcol = mapping.getColumn(field.name());
if (hcol == null) {
- throw new RuntimeException("HBase mapping for field ["+
persistent.getClass().getName() +
- "#"+ field.name()+"] not found. Wrong gora-hbase-mapping.xml?");
- }
- switch(type) {
- case MAP:
- Set<Map.Entry> set = ((Map)o).entrySet();
- for(Entry entry: set) {
- byte[] qual = toBytes(entry.getKey());
- byte[] val = toBytes(entry.getValue(),
field.schema().getValueType());
- // Gora 207: Top-most record level ["null","type"] must be saved
raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
- }
- break;
- case ARRAY:
- List<?> array = (List<?>) o;
- int j=0;
- for(Object item : array) {
- byte[] val = toBytes(item);
- // Gora 207: Top-most record level ["null","type"]
- // must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
- hasPuts = true;
- }
- }
- break;
- default:
- // Gora 207: Top-most record level ["null","type"]
- // must be saved raw. "null"=>delete
- byte[] serializedBytes = toBytes(o, field.schema()) ;
- if (serializedBytes == null) { // value == null => must delete the
column
- delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
- hasPuts = true;
- }
- break;
+ throw new RuntimeException("HBase mapping for field ["
+ + persistent.getClass().getName() + "#" + field.name()
+ + "] not found. Wrong gora-hbase-mapping.xml?");
}
+ addPutsAndDeletes(put, delete, o, field.schema().getType(),
+ field.schema(), hcol, hcol.getQualifier());
}
- if (hasPuts) {
+ if (put.size() > 0) {
table.put(put);
}
- if (hasDeletes) {
+ if (delete.size() > 0) {
+ table.delete(delete);
table.delete(delete);
+ table.delete(delete); // HBase sometimes does not delete arbitrarily
}
- } catch(IOException ex2){
+ } catch (IOException ex2) {
LOG.error(ex2.getMessage());
LOG.error(ex2.getStackTrace().toString());
}
}
+ private void addPutsAndDeletes(Put put, Delete delete, Object o, Type type,
+ Schema schema, HBaseColumn hcol, byte[] qualifier) throws IOException {
+ switch (type) {
+ case UNION:
+ if (isNullable(schema) && o == null) {
+ if (qualifier == null) {
+ delete.deleteFamily(hcol.getFamily());
+ } else {
+ delete.deleteColumn(hcol.getFamily(), qualifier);
+ }
+ } else {
+// int index = GenericData.get().resolveUnion(schema, o);
+ int index = getResolvedUnionIndex(schema);
+ if (index > 1) { //if more than 2 type in union, serialize directly
for now
+ byte[] serializedBytes = toBytes(o, schema);
+ put.add(hcol.getFamily(), qualifier, serializedBytes);
+ } else {
+ Schema resolvedSchema = schema.getTypes().get(index);
+ addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
+ resolvedSchema, hcol, qualifier);
+ }
+ }
+ break;
+ case MAP:
+ Set<Entry> set = ((Map) o).entrySet();
+ for (Entry entry : set) {
+ byte[] qual = toBytes(entry.getKey());
+ addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
+ .getType(), schema.getValueType(), hcol, qual);
+ }
+ break;
+ case ARRAY:
+ List<?> array = (List<?>) o;
+ int j = 0;
+ for (Object item : array) {
+ addPutsAndDeletes(put, delete, item, schema.getElementType().getType(),
+ schema.getElementType(), hcol, Bytes.toBytes(j++));
+ }
+ break;
+ default:
+ byte[] serializedBytes = toBytes(o, schema);
+ put.add(hcol.getFamily(), qualifier, serializedBytes);
+ break;
+ }
+ }
+
+ private boolean isNullable(Schema unionSchema) {
+ for (Schema innerSchema : unionSchema.getTypes()) {
+ if (innerSchema.getType().equals(Schema.Type.NULL)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void delete(T obj) {
throw new RuntimeException("Not implemented yet");
}
@@ -488,18 +501,28 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- get.addFamily(col.family); break;
- default:
- get.addColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(get, col, fieldSchema);
}
}
- private void addFields(Scan scan, Query<K,T> query)
- throws IOException {
+ private void addFamilyOrColumn(Get get, HBaseColumn col, Schema fieldSchema)
{
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(get, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ get.addFamily(col.family);
+ break;
+ default:
+ get.addColumn(col.family, col.qualifier);
+ break;
+ }
+ }
+
+ private void addFields(Scan scan, Query<K, T> query) throws IOException {
String[] fields = query.getFields();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
@@ -508,19 +531,30 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- scan.addFamily(col.family); break;
- default:
- scan.addColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(scan, col, fieldSchema);
}
}
- //TODO: HBase Get, Scan, Delete should extend some common interface with
addFamily, etc
- private void addFields(Delete delete, Query<K,T> query)
- throws IOException {
+ private void addFamilyOrColumn(Scan scan, HBaseColumn col, Schema
fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(scan, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ scan.addFamily(col.family);
+ break;
+ default:
+ scan.addColumn(col.family, col.qualifier);
+ break;
+ }
+ }
+
+ // TODO: HBase Get, Scan, Delete should extend some common interface with
+ // addFamily, etc
+ private void addFields(Delete delete, Query<K, T> query) throws
IOException {
String[] fields = query.getFields();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
@@ -529,13 +563,25 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- delete.deleteFamily(col.family); break;
- default:
- delete.deleteColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(delete, col, fieldSchema);
+ }
+ }
+
+ private void addFamilyOrColumn(Delete delete, HBaseColumn col,
+ Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(delete, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ delete.deleteFamily(col.family);
+ break;
+ default:
+ delete.deleteColumn(col.family, col.qualifier);
+ break;
}
}
@@ -574,47 +620,87 @@ implements Configurable {
}
Field field = fieldMap.get(f);
Schema fieldSchema = field.schema();
- switch(fieldSchema.getType()) {
- case MAP:
- NavigableMap<byte[], byte[]> qualMap =
- result.getNoVersionMap().get(col.getFamily());
- if (qualMap == null) {
- continue;
- }
- Schema valueSchema = fieldSchema.getValueType();
- Map map = new HashMap();
- for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- map.put(new Utf8(Bytes.toString(e.getKey())),
- fromBytes(valueSchema, e.getValue()));
- }
- setField(persistent, field, map);
- break;
- case ARRAY:
- qualMap = result.getFamilyMap(col.getFamily());
- if (qualMap == null) {
- continue;
- }
- valueSchema = fieldSchema.getElementType();
- ArrayList arrayList = new ArrayList();
- DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
- for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
- }
- setField(persistent, field, arrayList);
- break;
- default:
- byte[] val = result.getValue(col.getFamily(), col.getQualifier());
- if (val == null) {
- continue;
- }
- setField(persistent, field, val);
- break;
- }
+ setField(result,persistent, col, field, fieldSchema);
}
persistent.clearDirty();
return persistent;
}
+ private void setField(Result result, T persistent, HBaseColumn col,
+ Field field, Schema fieldSchema) throws IOException {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ if (index > 1) { //if more than 2 type in union, deserialize directly
for now
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+ if (val == null) {
+ return;
+ }
+ setField(persistent, field, val);
+ } else {
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ setField(result, persistent, col, field, resolvedSchema);
+ }
+ break;
+ case MAP:
+ NavigableMap<byte[], byte[]> qualMap = result.getNoVersionMap().get(
+ col.getFamily());
+ if (qualMap == null) {
+ return;
+ }
+ Schema valueSchema = fieldSchema.getValueType();
+ Map map = new HashMap();
+ for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+ map.put(new Utf8(Bytes.toString(e.getKey())),
+ fromBytes(valueSchema, e.getValue()));
+ }
+ setField(persistent, field, map);
+ break;
+ case ARRAY:
+ qualMap = result.getFamilyMap(col.getFamily());
+ if (qualMap == null) {
+ return;
+ }
+ valueSchema = fieldSchema.getElementType();
+ ArrayList arrayList = new ArrayList();
+ DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
+ for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+ dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
+ }
+ setField(persistent, field, arrayList);
+ break;
+ default:
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+ if (val == null) {
+ return;
+ }
+ setField(persistent, field, val);
+ break;
+ }
+ }
+
+ //TODO temporary solution, has to be changed after implementation of saving
the index of union type
+ private int getResolvedUnionIndex(Schema unionScema) {
+ if (unionScema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = unionScema.getTypes().get(0).getType();
+ Type type1 = unionScema.getTypes().get(1).getType();
+
+ // Check if types are different and there's a "null", like
["null","type"]
+ // or ["type","null"]
+ if (!type0.equals(type1)
+ && (type0.equals(Schema.Type.NULL) ||
type1.equals(Schema.Type.NULL))) {
+
+ if (type0.equals(Schema.Type.NULL))
+ return 1;
+ else
+ return 0;
+ }
+ }
+ return 2;
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setField(T persistent, Field field, Map map) {
persistent.put(field.pos(), new DirtyMapWrapper(map));
Modified:
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
(original)
+++
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
Wed Feb 26 17:27:44 2014
@@ -232,41 +232,13 @@ public class HBaseByteInterface {
case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() };
case UNION:
- // XXX Special case: When writing the top-level field of a record we
must handle the
- // special case ["null","type"] definitions: this will be written as if
it was ["type"]
- // if not in a special case, will execute "case RECORD".
-
- if (schema.getTypes().size() == 2) {
-
- // schema [type0, type1]
- Type type0 = schema.getTypes().get(0).getType() ;
- Type type1 = schema.getTypes().get(1).getType() ;
-
- // Check if types are different and there's a "null", like
["null","type"] or ["type","null"]
- if (!type0.equals(type1)
- && ( type0.equals(Schema.Type.NULL)
- || type1.equals(Schema.Type.NULL))) {
-
- if (o == null) return null ;
-
- int index = GenericData.get().resolveUnion(schema, o);
- schema = schema.getTypes().get(index) ;
-
- return toBytes(o, schema) ; // Serialize as if schema was ["type"]
- }
-
- }
- // else
- // type = [type0,type1] where type0=type1
- // => Serialize like "case RECORD" with Avro
-
case RECORD:
SpecificDatumWriter writer = (SpecificDatumWriter<?>)
writerMap.get(schema.getFullName());
if (writer == null) {
writer = new SpecificDatumWriter(schema);// ignore dirty bits
writerMap.put(schema.getFullName(),writer);
}
-
+
BinaryEncoder encoderFromCache = encoders.get();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
outputStream.set(bos);
@@ -274,11 +246,11 @@ public class HBaseByteInterface {
if (encoderFromCache == null) {
encoders.set(encoder);
}
-
+
//reset the buffers
ByteArrayOutputStream os = outputStream.get();
os.reset();
-
+
writer.write(o, encoder);
encoder.flush();
return os.toByteArray();
Modified: gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml
(original)
+++ gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml Wed
Feb 26 17:27:44 2014
@@ -44,6 +44,7 @@
<field name="content" family="content"/>
<field name="parsedContent" family="parsedContent"/>
<field name="outlinks" family="outlinks"/>
+ <field name="headers" family="headers"/>
<field name="metadata" family="common" qualifier="metadata"/>
</class>