Author: lewismc
Date: Wed Feb 26 17:27:44 2014
New Revision: 1572170

URL: http://svn.apache.org/r1572170
Log:
GORA-94v11 and GORA-246v5

Modified:
    gora/branches/GORA_94/gora-core/src/examples/avro/employee.json
    
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
    
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
    
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
    
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
    gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml

Modified: gora/branches/GORA_94/gora-core/src/examples/avro/employee.json
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/examples/avro/employee.json?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/examples/avro/employee.json (original)
+++ gora/branches/GORA_94/gora-core/src/examples/avro/employee.json Wed Feb 26 
17:27:44 2014
@@ -17,7 +17,8 @@
            {"name": "url", "type": ["null","string"], "default":null},
            {"name": "content", "type": ["null","bytes"],"default":null},
            {"name": "parsedContent", "type": {"type":"array", "items": 
"string"},"default":{}},
-           {"name": "outlinks", "type": {"type":"map", 
"values":"string"},"default":{}},
+           {"name": "outlinks", "type": {"type":"map", "values":["null", 
"string"]},"default":{}},
+           {"name": "headers", "type": ["null", {"type": "map", "values": 
["null", "string"]}],"default":null},
            {"name": "metadata", "default":null, "type": {
             "name": "Metadata",
             "type": "record",

Modified: 
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
 (original)
+++ 
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
 Wed Feb 26 17:27:44 2014
@@ -210,7 +210,11 @@ public abstract class DataStoreTestBase 
   public void testUpdate() throws IOException, Exception {
     log.info("test method: testUpdate");
     DataStoreTestUtil.testUpdateEmployee(employeeStore);
-    DataStoreTestUtil.testUpdateWebPage(webPageStore);
+    DataStoreTestUtil.testUpdateWebPagePutToArray(webPageStore);
+    DataStoreTestUtil.testUpdateWebPagePutToNotNullableMap(webPageStore);
+    DataStoreTestUtil.testUpdateWebPagePutToNullableMap(webPageStore);
+    DataStoreTestUtil.testUpdateWebPageRemoveMapEntry(webPageStore);
+    DataStoreTestUtil.testUpdateWebPageRemoveField(webPageStore);
   }
 
   public void testEmptyUpdate() throws IOException, Exception {

Modified: 
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
 (original)
+++ 
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
 Wed Feb 26 17:27:44 2014
@@ -232,7 +232,7 @@ public class DataStoreTestUtil {
 
     Employee employee = DataStoreTestUtil.createEmployee(dataStore);
     employee.setBoss(new Utf8("Real boss")) ;
-    
+
     String ssn = employee.getSsn().toString();
     dataStore.put(ssn, employee);
     dataStore.flush();
@@ -313,7 +313,7 @@ public class DataStoreTestUtil {
       if (employee.getBoss() instanceof Utf8) {
         String beforeBoss = employee.getBoss().toString();
         String afterBoss = after.getBoss().toString();
-        assertEquals("Boss String field values in UNION should be the same", 
+        assertEquals("Boss String field values in UNION should be the same",
             beforeBoss, afterBoss);
       } else {
         Employee beforeBoss = (Employee) employee.getBoss();
@@ -480,15 +480,14 @@ public class DataStoreTestUtil {
    * @throws IOException
    * @throws Exception
    */
-  public static void testUpdateWebPage(DataStore<String, WebPage> dataStore)
+  public static void testUpdateWebPagePutToArray(DataStore<String, WebPage> 
dataStore)
   throws IOException, Exception {
     dataStore.createSchema();
 
     String[] urls = {"http://a.com/a";, "http://b.com/b";, "http://c.com/c";,
-        "http://d.com/d";, "http://e.com/e";, "http://f.com/f";, 
"http://g.com/g"};
+        "http://d.com/d";, "http://e.com/e";, "http://f.com/f";, "http://g.com/g"; 
};
     String content = "content";
     String parsedContent = "parsedContent";
-    String anchor = "anchor";
 
     int parsedContentCount = 0;
 
@@ -496,13 +495,9 @@ public class DataStoreTestUtil {
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = WebPage.newBuilder().build();
       webPage.setUrl(new Utf8(urls[i]));
-      webPage.setParsedContent(new ArrayList<CharSequence>());
       for (parsedContentCount = 0; parsedContentCount < 5; 
parsedContentCount++) {
         webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + 
parsedContentCount));
       }
-      for (int j = 0; j < urls.length; j += 2) {
-        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
-      }
       dataStore.put(webPage.getUrl().toString(), webPage);
     }
 
@@ -514,11 +509,49 @@ public class DataStoreTestUtil {
       for (parsedContentCount = 5; parsedContentCount < 10; 
parsedContentCount++) {
         webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + 
parsedContentCount));
       }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      assertEquals(content + i, ByteUtils.toString( 
toByteArray(webPage.getContent()) ));
+      assertEquals(10, webPage.getParsedContent().size());
+      int j = 0;
+      for (CharSequence pc : webPage.getParsedContent()) {
+        assertEquals(parsedContent + i + "," + j, pc.toString());
+        j++;
+      }
+    }
+  }
+
+  public static void testUpdateWebPagePutToNotNullableMap(DataStore<String, 
WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a";, "http://b.com/b";, "http://c.com/c";,
+        "http://d.com/d";, "http://e.com/e";, "http://f.com/f";, "http://g.com/g"; 
};
+    String anchor = "anchor";
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
+      for (int j = 0; j < urls.length; j += 2) {
+        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
       webPage.getOutlinks().clear();
       for (int j = 1; j < urls.length; j += 2) {
         webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
       }
-      //test for double put of same entries
+      // test for double put of same entries
       for (int j = 1; j < urls.length; j += 2) {
         webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
       }
@@ -529,15 +562,9 @@ public class DataStoreTestUtil {
 
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = dataStore.get(urls[i]);
-      assertEquals(content + i, ByteUtils.toString( 
toByteArray(webPage.getContent()) ));
-      assertEquals(10, webPage.getParsedContent().size());
       int j = 0;
-      for (CharSequence pc : webPage.getParsedContent()) {
-        assertEquals(parsedContent + i + "," + j, pc.toString());
-        j++;
-      }
       int count = 0;
-      for (j = 0; j < urls.length; j++) {
+      for (j = 0; j < urls.length; j++) {  //TODO j++ or j+=2 ?
         CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
         assertNotNull(link);
         assertEquals(urls[j], link.toString());
@@ -545,11 +572,36 @@ public class DataStoreTestUtil {
       }
       assertEquals(count, webPage.getOutlinks().size());
     }
+  }
+
+  public static void testUpdateWebPagePutToNullableMap(DataStore<String, 
WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a";, "http://b.com/b";, "http://c.com/c";,
+        "http://d.com/d";, "http://e.com/e";, "http://f.com/f";, "http://g.com/g"; 
};
+    String header = "header";
+    String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+        "fourthHeader", "fifthHeader", "sixthHeader" };
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
+      //test put for nullable map field
+      webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+      for (int j = 0; j < headers.length; j += 2) {
+        webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
 
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = dataStore.get(urls[i]);
-      for (int j = 0; j < urls.length; j += 2) {
-        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      webPage.getHeaders().clear(); //TODO clear method does not work
+      for (int j = 1; j < headers.length; j += 2) {
+        webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
       }
       dataStore.put(webPage.getUrl().toString(), webPage);
     }
@@ -558,13 +610,95 @@ public class DataStoreTestUtil {
 
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = dataStore.get(urls[i]);
+      int j = 0;
       int count = 0;
+      for (j = 0; j < headers.length; j++) {  //TODO j++ or j+=2 ?
+        CharSequence headerSample = webPage.getHeaders().get(new Utf8(header + 
j));
+        assertNotNull(headerSample);
+        assertEquals(headers[j], headerSample.toString());
+        count++;
+      }
+      assertEquals(count, webPage.getHeaders().size());
+    }
+  }
+
+  public static void testUpdateWebPageRemoveMapEntry(DataStore<String, 
WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a";, "http://b.com/b";, "http://c.com/c";,
+        "http://d.com/d";, "http://e.com/e";, "http://f.com/f";, "http://g.com/g"; 
};
+    String anchor = "anchor";
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
       for (int j = 0; j < urls.length; j++) {
+        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    // map entry removal test
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      for (int j = 1; j < urls.length; j += 2) {
+        webPage.getOutlinks().put(new Utf8(anchor + j), null);
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      int count = 0;
+      WebPage webPage = dataStore.get(urls[i]);
+      for (int j = 1; j < urls.length; j += 2) {
         CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
-        assertNotNull(link);
-        assertEquals(urls[j], link.toString());
+        assertNull(link);
         count++;
       }
+      assertEquals(urls.length - count, webPage.getOutlinks().size());
+    }
+  }
+
+  public static void testUpdateWebPageRemoveField(DataStore<String, WebPage> 
dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a";, "http://b.com/b";, "http://c.com/c";,
+        "http://d.com/d";, "http://e.com/e";, "http://f.com/f";, "http://g.com/g"; 
};
+    String anchor = "anchor";
+    String header = "header";
+    String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+        "fourthHeader", "fifthHeader", "sixthHeader" };
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
+      webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+      for (int j = 0; j < headers.length; j++) {
+        webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    // nullable map field removal test
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      webPage.setHeaders(null);
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      assertNull(webPage.getHeaders());
     }
   }
 

Modified: 
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
 (original)
+++ 
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
 Wed Feb 26 17:27:44 2014
@@ -224,97 +224,110 @@ implements Configurable {
   }
 
   /**
-   * {@inheritDoc}
-   * Serializes the Persistent data and saves in HBase.
-   * Topmost fields of the record are persisted in "raw" format (not avro 
serialized). This behavior happens
-   * in maps and arrays too.
+   * {@inheritDoc} Serializes the Persistent data and saves in HBase. Topmost
+   * fields of the record are persisted in "raw" format (not avro serialized).
+   * This behavior happens in maps and arrays too.
    * 
-   * ["null","type"] type (a.k.a. optional field) is persisted like as if it 
is ["type"], but the column get
-   * deleted if value==null (so value read after will be null).
+   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is
+   * ["type"], but the column get deleted if value==null (so value read after
+   * will be null).
    * 
-   * @param persistent Record to be persisted in HBase
+   * @param persistent
+   *          Record to be persisted in HBase
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void put(K key, T persistent) {
-    try{
+    try {
       Schema schema = persistent.getSchema();
       byte[] keyRaw = toBytes(key);
       Put put = new Put(keyRaw);
       Delete delete = new Delete(keyRaw);
-      boolean hasPuts = false;
-      boolean hasDeletes = false;
       List<Field> fields = schema.getFields();
-      for (int i = 1; i<fields.size(); i++) {
+      for (int i = 1; i < fields.size(); i++) {
         if (!persistent.isDirty(i)) {
           continue;
         }
         Field field = fields.get(i);
-        Type type = field.schema().getType();
         Object o = persistent.get(i);
         HBaseColumn hcol = mapping.getColumn(field.name());
         if (hcol == null) {
-          throw new RuntimeException("HBase mapping for field ["+ 
persistent.getClass().getName() +
-              "#"+ field.name()+"] not found. Wrong gora-hbase-mapping.xml?");
-        }
-        switch(type) {
-          case MAP:
-            Set<Map.Entry> set = ((Map)o).entrySet();
-            for(Entry entry: set) {
-              byte[] qual = toBytes(entry.getKey());
-              byte[] val = toBytes(entry.getValue(), 
field.schema().getValueType());
-              // Gora 207: Top-most record level ["null","type"] must be saved 
raw. "null"=>delete
-              if (val == null) { // value == null => must delete the column
-                delete.deleteColumn(hcol.getFamily(), qual);
-                hasDeletes = true;
-              } else {
-                put.add(hcol.getFamily(), qual, val);
-                hasPuts = true;
-              }
-            }
-            break;
-          case ARRAY:
-            List<?> array = (List<?>) o;
-            int j=0;
-            for(Object item : array) {
-              byte[] val = toBytes(item);
-              // Gora 207: Top-most record level ["null","type"] 
-              // must be saved raw. "null"=>delete
-              if (val == null) { // value == null => must delete the column
-                delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
-                hasDeletes = true;
-              } else {
-                put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
-                hasPuts = true;
-              }
-            }
-            break;
-          default:
-            // Gora 207: Top-most record level ["null","type"] 
-            // must be saved raw. "null"=>delete
-            byte[] serializedBytes = toBytes(o, field.schema()) ;
-            if (serializedBytes == null) { // value == null => must delete the 
column
-              delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
-              hasDeletes = true;
-            } else {
-              put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
-              hasPuts = true;
-            }
-            break;
+          throw new RuntimeException("HBase mapping for field ["
+              + persistent.getClass().getName() + "#" + field.name()
+              + "] not found. Wrong gora-hbase-mapping.xml?");
         }
+        addPutsAndDeletes(put, delete, o, field.schema().getType(),
+            field.schema(), hcol, hcol.getQualifier());
       }
-      if (hasPuts) {
+      if (put.size() > 0) {
         table.put(put);
       }
-      if (hasDeletes) {
+      if (delete.size() > 0) {
+        table.delete(delete);
         table.delete(delete);
+        table.delete(delete); // HBase sometimes does not delete arbitrarily
       }
-    } catch(IOException ex2){
+    } catch (IOException ex2) {
       LOG.error(ex2.getMessage());
       LOG.error(ex2.getStackTrace().toString());
     }
   }
 
+  private void addPutsAndDeletes(Put put, Delete delete, Object o, Type type,
+      Schema schema, HBaseColumn hcol, byte[] qualifier) throws IOException {
+    switch (type) {
+    case UNION:
+      if (isNullable(schema) && o == null) {
+        if (qualifier == null) {
+          delete.deleteFamily(hcol.getFamily());
+        } else {
+          delete.deleteColumn(hcol.getFamily(), qualifier);
+        }
+      } else {
+//        int index = GenericData.get().resolveUnion(schema, o);
+        int index = getResolvedUnionIndex(schema);
+        if (index > 1) {  //if more than 2 type in union, serialize directly 
for now
+          byte[] serializedBytes = toBytes(o, schema);
+          put.add(hcol.getFamily(), qualifier, serializedBytes);
+        } else {
+          Schema resolvedSchema = schema.getTypes().get(index);
+          addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
+              resolvedSchema, hcol, qualifier);
+        }
+      }
+      break;
+    case MAP:
+      Set<Entry> set = ((Map) o).entrySet();
+      for (Entry entry : set) {
+        byte[] qual = toBytes(entry.getKey());
+        addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
+            .getType(), schema.getValueType(), hcol, qual);
+      }
+      break;
+    case ARRAY:
+      List<?> array = (List<?>) o;
+      int j = 0;
+      for (Object item : array) {
+        addPutsAndDeletes(put, delete, item, schema.getElementType().getType(),
+            schema.getElementType(), hcol, Bytes.toBytes(j++));
+      }
+      break;
+    default:
+      byte[] serializedBytes = toBytes(o, schema);
+      put.add(hcol.getFamily(), qualifier, serializedBytes);
+      break;
+    }
+  }
+
+  private boolean isNullable(Schema unionSchema) {
+    for (Schema innerSchema : unionSchema.getTypes()) {
+      if (innerSchema.getType().equals(Schema.Type.NULL)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void delete(T obj) {
     throw new RuntimeException("Not implemented yet");
   }
@@ -488,18 +501,28 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          get.addFamily(col.family); break;
-        default:
-          get.addColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(get, col, fieldSchema);
     }
   }
 
-  private void addFields(Scan scan, Query<K,T> query)
-  throws IOException {
+  private void addFamilyOrColumn(Get get, HBaseColumn col, Schema fieldSchema) 
{
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(get, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      get.addFamily(col.family);
+      break;
+    default:
+      get.addColumn(col.family, col.qualifier);
+      break;
+    }
+  }
+
+  private void addFields(Scan scan, Query<K, T> query) throws IOException {
     String[] fields = query.getFields();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
@@ -508,19 +531,30 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          scan.addFamily(col.family); break;
-        default:
-          scan.addColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(scan, col, fieldSchema);
     }
   }
 
-  //TODO: HBase Get, Scan, Delete should extend some common interface with 
addFamily, etc
-  private void addFields(Delete delete, Query<K,T> query)
-    throws IOException {
+  private void addFamilyOrColumn(Scan scan, HBaseColumn col, Schema 
fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(scan, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      scan.addFamily(col.family);
+      break;
+    default:
+      scan.addColumn(col.family, col.qualifier);
+      break;
+    }
+  }
+
+  // TODO: HBase Get, Scan, Delete should extend some common interface with
+  // addFamily, etc
+  private void addFields(Delete delete, Query<K, T> query)    throws 
IOException {
     String[] fields = query.getFields();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
@@ -529,13 +563,25 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          delete.deleteFamily(col.family); break;
-        default:
-          delete.deleteColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(delete, col, fieldSchema);
+    }
+  }
+
+  private void addFamilyOrColumn(Delete delete, HBaseColumn col,
+      Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(delete, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      delete.deleteFamily(col.family);
+      break;
+    default:
+      delete.deleteColumn(col.family, col.qualifier);
+      break;
     }
   }
 
@@ -574,47 +620,87 @@ implements Configurable {
       }
       Field field = fieldMap.get(f);
       Schema fieldSchema = field.schema();
-      switch(fieldSchema.getType()) {
-        case MAP:
-          NavigableMap<byte[], byte[]> qualMap =
-            result.getNoVersionMap().get(col.getFamily());
-          if (qualMap == null) {
-            continue;
-          }
-          Schema valueSchema = fieldSchema.getValueType();
-          Map map = new HashMap();
-          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
-            map.put(new Utf8(Bytes.toString(e.getKey())),
-                fromBytes(valueSchema, e.getValue()));
-          }
-          setField(persistent, field, map);
-          break;
-        case ARRAY:
-          qualMap = result.getFamilyMap(col.getFamily());
-          if (qualMap == null) {
-            continue;
-          }
-          valueSchema = fieldSchema.getElementType();
-          ArrayList arrayList = new ArrayList();
-          DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
-          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
-            dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
-          }
-          setField(persistent, field, arrayList);
-          break;
-        default:
-          byte[] val = result.getValue(col.getFamily(), col.getQualifier());
-          if (val == null) {
-            continue;
-          }
-          setField(persistent, field, val);
-          break;
-      }
+      setField(result,persistent, col, field, fieldSchema);
     }
     persistent.clearDirty();
     return persistent;
   }
 
+  private void setField(Result result, T persistent, HBaseColumn col,
+      Field field, Schema fieldSchema) throws IOException {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      if (index > 1) { //if more than 2 type in union, deserialize directly 
for now
+        byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+        if (val == null) {
+          return;
+        }
+        setField(persistent, field, val);
+      } else {
+        Schema resolvedSchema = fieldSchema.getTypes().get(index);
+        setField(result, persistent, col, field, resolvedSchema);
+      }
+      break;
+    case MAP:
+      NavigableMap<byte[], byte[]> qualMap = result.getNoVersionMap().get(
+          col.getFamily());
+      if (qualMap == null) {
+        return;
+      }
+      Schema valueSchema = fieldSchema.getValueType();
+      Map map = new HashMap();
+      for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+        map.put(new Utf8(Bytes.toString(e.getKey())),
+            fromBytes(valueSchema, e.getValue()));
+      }
+      setField(persistent, field, map);
+      break;
+    case ARRAY:
+      qualMap = result.getFamilyMap(col.getFamily());
+      if (qualMap == null) {
+        return;
+      }
+      valueSchema = fieldSchema.getElementType();
+      ArrayList arrayList = new ArrayList();
+      DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
+      for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+        dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
+      }
+      setField(persistent, field, arrayList);
+      break;
+    default:
+      byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+      if (val == null) {
+        return;
+      }
+      setField(persistent, field, val);
+      break;
+    }
+  }
+
+  //TODO temporary solution, has to be changed after implementation of saving 
the index of union type
+  private int getResolvedUnionIndex(Schema unionScema) {
+    if (unionScema.getTypes().size() == 2) {
+
+      // schema [type0, type1]
+      Type type0 = unionScema.getTypes().get(0).getType();
+      Type type1 = unionScema.getTypes().get(1).getType();
+
+      // Check if types are different and there's a "null", like 
["null","type"]
+      // or ["type","null"]
+      if (!type0.equals(type1)
+          && (type0.equals(Schema.Type.NULL) || 
type1.equals(Schema.Type.NULL))) {
+
+        if (type0.equals(Schema.Type.NULL))
+          return 1;
+        else
+          return 0;
+      }
+    }
+    return 2;
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void setField(T persistent, Field field, Map map) {
     persistent.put(field.pos(), new DirtyMapWrapper(map));

Modified: 
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
 (original)
+++ 
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
 Wed Feb 26 17:27:44 2014
@@ -232,41 +232,13 @@ public class HBaseByteInterface {
     case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
     case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
     case UNION:
-      // XXX Special case: When writing the top-level field of a record we 
must handle the
-      // special case ["null","type"] definitions: this will be written as if 
it was ["type"]
-      // if not in a special case, will execute "case RECORD".
-      
-      if (schema.getTypes().size() == 2) {
-        
-        // schema [type0, type1]
-        Type type0 = schema.getTypes().get(0).getType() ;
-        Type type1 = schema.getTypes().get(1).getType() ;
-        
-        // Check if types are different and there's a "null", like 
["null","type"] or ["type","null"]
-        if (!type0.equals(type1)
-            && (   type0.equals(Schema.Type.NULL)
-                || type1.equals(Schema.Type.NULL))) {
-
-          if (o == null) return null ;
-          
-          int index = GenericData.get().resolveUnion(schema, o);
-          schema = schema.getTypes().get(index) ;
-          
-          return toBytes(o, schema) ; // Serialize as if schema was ["type"] 
-        }
-        
-      }
-      // else
-      //   type = [type0,type1] where type0=type1
-      // => Serialize like "case RECORD" with Avro
-      
     case RECORD:
       SpecificDatumWriter writer = (SpecificDatumWriter<?>) 
writerMap.get(schema.getFullName());
       if (writer == null) {
         writer = new SpecificDatumWriter(schema);// ignore dirty bits
         writerMap.put(schema.getFullName(),writer);
       }
-      
+
       BinaryEncoder encoderFromCache = encoders.get();
       ByteArrayOutputStream bos = new ByteArrayOutputStream();
       outputStream.set(bos);
@@ -274,11 +246,11 @@ public class HBaseByteInterface {
       if (encoderFromCache == null) {
         encoders.set(encoder);
       }
-      
+
       //reset the buffers
       ByteArrayOutputStream os = outputStream.get();
       os.reset();
-      
+
       writer.write(o, encoder);
       encoder.flush();
       return os.toByteArray();

Modified: gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1572170&r1=1572169&r2=1572170&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml 
(original)
+++ gora/branches/GORA_94/gora-hbase/src/test/conf/gora-hbase-mapping.xml Wed 
Feb 26 17:27:44 2014
@@ -44,6 +44,7 @@
     <field name="content" family="content"/>
     <field name="parsedContent" family="parsedContent"/>
     <field name="outlinks" family="outlinks"/>
+    <field name="headers" family="headers"/>
     <field name="metadata" family="common" qualifier="metadata"/>
   </class>
 


Reply via email to