Author: lewismc
Date: Sun Mar 30 11:07:06 2014
New Revision: 1583119

URL: http://svn.apache.org/r1583119
Log:
GORA-303 Upgrade to Avro 1.7.X in gora-solr

Modified:
    
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
    gora/branches/GORA_94/gora-solr/pom.xml
    
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
    gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml
    gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties
    gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
    gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
    
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
    gora/branches/GORA_94/pom.xml

Modified: 
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java 
(original)
+++ 
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java 
Sun Mar 30 11:07:06 2014
@@ -165,6 +165,18 @@ public class IOUtils {
   /**
    * Serializes the field object using the datumWriter.
    */
+  public static<T> void serialize(OutputStream os,
+      SpecificDatumWriter<T> datumWriter, Schema schema, T object)
+      throws IOException {
+
+    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    datumWriter.write(object, encoder);
+    encoder.flush();
+  }
+  
+  /**
+   * Serializes the field object using the datumWriter.
+   */
   public static<T extends SpecificRecord> byte[] 
serialize(SpecificDatumWriter<T> datumWriter
       , Schema schema, T object) throws IOException {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
@@ -172,6 +184,16 @@ public class IOUtils {
     return os.toByteArray();
   }
 
+  /**
+   * Serializes the field object using the datumWriter.
+   */
+  public static<T> byte[] serialize(SpecificDatumWriter<T> datumWriter
+      , Schema schema, T object) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    serialize(os, datumWriter, schema, object);
+    return os.toByteArray();
+  }
+  
   /** Deserializes the object in the given datainput using
    * available Hadoop serializations.
    * @throws IOException
@@ -258,6 +280,16 @@ public class IOUtils {
   }
 
   /**
+   * Deserializes the field object using the datumReader.
+   */
+  public static<K, T> T deserialize(byte[] bytes,
+      SpecificDatumReader<T> datumReader, Schema schema, T object)
+      throws IOException {
+    decoder = DecoderFactory.get().binaryDecoder(bytes, decoder);
+    return (T)datumReader.read(object, decoder);
+  }
+  
+  /**
    * Writes a byte[] to the output, representing whether each given field is 
null
    * or not. A Vint and ceil( fields.length / 8 ) bytes are written to the 
output.
    * @param out the output to write to

Modified: gora/branches/GORA_94/gora-solr/pom.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/pom.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/pom.xml (original)
+++ gora/branches/GORA_94/gora-solr/pom.xml Sun Mar 30 11:07:06 2014
@@ -188,7 +188,7 @@
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
-      <scope>compile</scope>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>

Modified: 
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
 (original)
+++ 
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
 Sun Mar 30 11:07:06 2014
@@ -17,13 +17,20 @@ package org.apache.gora.solr.store;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.StateManager;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -53,8 +60,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, 
T> {
-    
-  private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class );
+
+  private static final Logger LOG = LoggerFactory.getLogger(SolrStore.class);
 
   protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml";
 
@@ -63,15 +70,15 @@ public class SolrStore<K, T extends Pers
   protected static final String SOLR_CONFIG_PROPERTY = "solr.config";
 
   protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema";
-    
+
   protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize";
-    
-  //protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
+
+  // protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
 
   protected static final String SOLR_COMMIT_WITHIN_PROPERTY = 
"solr.commitWithin";
 
   protected static final String SOLR_RESULTS_SIZE_PROPERTY = 
"solr.resultsSize";
-    
+
   protected static final int DEFAULT_BATCH_SIZE = 100;
 
   protected static final int DEFAULT_COMMIT_WITHIN = 1000;
@@ -92,85 +99,116 @@ public class SolrStore<K, T extends Pers
 
   private int resultsSize = DEFAULT_RESULTS_SIZE;
 
-  @Override
-  public void initialize( Class<K> keyClass, Class<T> persistentClass, 
Properties properties ) {
-    super.initialize( keyClass, persistentClass, properties );
-    try {
-      String mappingFile = DataStoreFactory.getMappingFile( properties, this, 
DEFAULT_MAPPING_FILE );
-      mapping = readMapping( mappingFile );
-    }
-    catch ( IOException e ) {
-      LOG.error( e.getMessage() );
-      LOG.error( e.getStackTrace().toString() );
-    }
-
-    solrServerUrl = DataStoreFactory.findProperty( properties, this, 
SOLR_URL_PROPERTY, null );
-    solrConfig = DataStoreFactory.findProperty( properties, this, 
SOLR_CONFIG_PROPERTY, null );
-    solrSchema = DataStoreFactory.findProperty( properties, this, 
SOLR_SCHEMA_PROPERTY, null );
-    LOG.info( "Using Solr server at " + solrServerUrl );
-    adminServer = new HttpSolrServer( solrServerUrl );
-    server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() );
-    if ( autoCreateSchema ) {
+  /**
+   * Default schema index with value "0" used when AVRO Union data types are
+   * stored
+   */
+  public static int DEFAULT_UNION_SCHEMA = 0;
+
+  /*
+   * Create a threadlocal map for the datum readers and writers, because they
+   * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
+   * they are thread safe, it is possible to maintain a single reader and 
writer
+   * pair for every schema, instead of one for every thread.
+   */
+
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> 
readerMap = new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> 
writerMap = new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) {
+    super.initialize(keyClass, persistentClass, properties);
+    try {
+      String mappingFile = DataStoreFactory.getMappingFile(properties, this,
+          DEFAULT_MAPPING_FILE);
+      mapping = readMapping(mappingFile);
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+    }
+
+    solrServerUrl = DataStoreFactory.findProperty(properties, this,
+        SOLR_URL_PROPERTY, null);
+    solrConfig = DataStoreFactory.findProperty(properties, this,
+        SOLR_CONFIG_PROPERTY, null);
+    solrSchema = DataStoreFactory.findProperty(properties, this,
+        SOLR_SCHEMA_PROPERTY, null);
+    LOG.info("Using Solr server at " + solrServerUrl);
+    adminServer = new HttpSolrServer(solrServerUrl);
+    server = new HttpSolrServer(solrServerUrl + "/" + mapping.getCoreName());
+    if (autoCreateSchema) {
       createSchema();
     }
-    String batchSizeString = DataStoreFactory.findProperty( properties, this, 
SOLR_BATCH_SIZE_PROPERTY, null );
-    if ( batchSizeString != null ) {
+    String batchSizeString = DataStoreFactory.findProperty(properties, this,
+        SOLR_BATCH_SIZE_PROPERTY, null);
+    if (batchSizeString != null) {
       try {
-        batchSize = Integer.parseInt( batchSizeString );
-      } catch ( NumberFormatException nfe ) {
-        LOG.warn( "Invalid batch size '" + batchSizeString + "', using default 
" + DEFAULT_BATCH_SIZE );
+        batchSize = Integer.parseInt(batchSizeString);
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Invalid batch size '" + batchSizeString + "', using default "
+            + DEFAULT_BATCH_SIZE);
       }
     }
-    batch = new ArrayList<SolrInputDocument>( batchSize );
-    String commitWithinString = DataStoreFactory.findProperty( properties, 
this, SOLR_COMMIT_WITHIN_PROPERTY, null );
-    if ( commitWithinString != null ) {
+    batch = new ArrayList<SolrInputDocument>(batchSize);
+    String commitWithinString = DataStoreFactory.findProperty(properties, this,
+        SOLR_COMMIT_WITHIN_PROPERTY, null);
+    if (commitWithinString != null) {
       try {
-        commitWithin = Integer.parseInt( commitWithinString );
-      } catch ( NumberFormatException nfe ) {
-        LOG.warn( "Invalid commit within '" + commitWithinString + "', using 
default " + DEFAULT_COMMIT_WITHIN );
+        commitWithin = Integer.parseInt(commitWithinString);
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Invalid commit within '" + commitWithinString
+            + "', using default " + DEFAULT_COMMIT_WITHIN);
       }
     }
-    String resultsSizeString = DataStoreFactory.findProperty( properties, 
this, SOLR_RESULTS_SIZE_PROPERTY, null );
-    if ( resultsSizeString != null ) {
+    String resultsSizeString = DataStoreFactory.findProperty(properties, this,
+        SOLR_RESULTS_SIZE_PROPERTY, null);
+    if (resultsSizeString != null) {
       try {
-        resultsSize = Integer.parseInt( resultsSizeString );
-      } catch ( NumberFormatException nfe ) {
-        LOG.warn( "Invalid results size '" + resultsSizeString + "', using 
default " + DEFAULT_RESULTS_SIZE );
+        resultsSize = Integer.parseInt(resultsSizeString);
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Invalid results size '" + resultsSizeString
+            + "', using default " + DEFAULT_RESULTS_SIZE);
       }
     }
   }
 
   @SuppressWarnings("unchecked")
-  private SolrMapping readMapping( String filename ) throws IOException {
+  private SolrMapping readMapping(String filename) throws IOException {
     SolrMapping map = new SolrMapping();
     try {
       SAXBuilder builder = new SAXBuilder();
-      Document doc = builder.build( 
getClass().getClassLoader().getResourceAsStream( filename ) );
+      Document doc = builder.build(getClass().getClassLoader()
+          .getResourceAsStream(filename));
 
-      List<Element> classes = doc.getRootElement().getChildren( "class" );
+      List<Element> classes = doc.getRootElement().getChildren("class");
 
-      for ( Element classElement : classes ) {
-        if ( classElement.getAttributeValue( "keyClass" ).equals( 
keyClass.getCanonicalName() )
-            && classElement.getAttributeValue( "name" ).equals( 
persistentClass.getCanonicalName() ) ) {
-
-          String tableName = getSchemaName( classElement.getAttributeValue( 
"table" ), persistentClass );
-          map.setCoreName( tableName );
-
-          Element primaryKeyEl = classElement.getChild( "primarykey" );
-          map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) );
-
-          List<Element> fields = classElement.getChildren( "field" );
-
-          for ( Element field : fields ) {
-            String fieldName = field.getAttributeValue( "name" );
-            String columnName = field.getAttributeValue( "column" );
-            map.addField( fieldName, columnName );
+      for (Element classElement : classes) {
+        if (classElement.getAttributeValue("keyClass").equals(
+            keyClass.getCanonicalName())
+            && classElement.getAttributeValue("name").equals(
+                persistentClass.getCanonicalName())) {
+
+          String tableName = getSchemaName(
+              classElement.getAttributeValue("table"), persistentClass);
+          map.setCoreName(tableName);
+
+          Element primaryKeyEl = classElement.getChild("primarykey");
+          map.setPrimaryKey(primaryKeyEl.getAttributeValue("column"));
+
+          List<Element> fields = classElement.getChildren("field");
+
+          for (Element field : fields) {
+            String fieldName = field.getAttributeValue("name");
+            String columnName = field.getAttributeValue("column");
+            map.addField(fieldName, columnName);
           }
           break;
         }
       }
-    } catch ( Exception ex ) {
-      throw new IOException( ex );
+    } catch (Exception ex) {
+      throw new IOException(ex);
     }
 
     return map;
@@ -188,11 +226,11 @@ public class SolrStore<K, T extends Pers
   @Override
   public void createSchema() {
     try {
-      if ( !schemaExists() )
-          CoreAdminRequest.createCore( mapping.getCoreName(), 
mapping.getCoreName(), adminServer, solrConfig,
-          solrSchema );
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      if (!schemaExists())
+        CoreAdminRequest.createCore(mapping.getCoreName(),
+            mapping.getCoreName(), adminServer, solrConfig, solrSchema);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
   }
 
@@ -200,11 +238,11 @@ public class SolrStore<K, T extends Pers
   /** Default implementation deletes and recreates the schema*/
   public void truncateSchema() {
     try {
-      server.deleteByQuery( "*:*" );
+      server.deleteByQuery("*:*");
       server.commit();
-    } catch ( Exception e ) {
+    } catch (Exception e) {
       // ignore?
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
   }
 
@@ -212,20 +250,20 @@ public class SolrStore<K, T extends Pers
   public void deleteSchema() {
     // XXX should this be only in truncateSchema ???
     try {
-      server.deleteByQuery( "*:*" );
+      server.deleteByQuery("*:*");
       server.commit();
-    } catch ( Exception e ) {
-    // ignore?
-    // LOG.error(e.getMessage());
-    // LOG.error(e.getStackTrace().toString());
+    } catch (Exception e) {
+      // ignore?
+      // LOG.error(e.getMessage());
+      // LOG.error(e.getStackTrace().toString());
     }
     try {
-      CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer );
-    } catch ( Exception e ) {
-      if ( e.getMessage().contains( "No such core" ) ) {
+      CoreAdminRequest.unloadCore(mapping.getCoreName(), adminServer);
+    } catch (Exception e) {
+      if (e.getMessage().contains("No such core")) {
         return; // it's ok, the core is not there
       } else {
-        LOG.error( e.getMessage(), e.getStackTrace().toString() );
+        LOG.error(e.getMessage(), e.getStackTrace().toString());
       }
     }
   }
@@ -234,239 +272,360 @@ public class SolrStore<K, T extends Pers
   public boolean schemaExists() {
     boolean exists = false;
     try {
-      CoreAdminResponse rsp = CoreAdminRequest.getStatus( 
mapping.getCoreName(), adminServer );
-      exists = rsp.getUptime( mapping.getCoreName() ) != null;
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      CoreAdminResponse rsp = CoreAdminRequest.getStatus(mapping.getCoreName(),
+          adminServer);
+      exists = rsp.getUptime(mapping.getCoreName()) != null;
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return exists;
   }
 
-  private static final String toDelimitedString( String[] arr, String sep ) {
-    if ( arr == null || arr.length == 0 ) {
+  private static final String toDelimitedString(String[] arr, String sep) {
+    if (arr == null || arr.length == 0) {
       return "";
     }
     StringBuilder sb = new StringBuilder();
-    for ( int i = 0; i < arr.length; i++ ) {
-      if ( i > 0 )
-        sb.append( sep );
-        sb.append( arr[i] );
+    for (int i = 0; i < arr.length; i++) {
+      if (i > 0)
+        sb.append(sep);
+      sb.append(arr[i]);
     }
     return sb.toString();
   }
 
-  public static String escapeQueryKey( String key ) {
-    if ( key == null ) {
+  public static String escapeQueryKey(String key) {
+    if (key == null) {
       return null;
     }
     StringBuilder sb = new StringBuilder();
-    for ( int i = 0; i < key.length(); i++ ) {
-      char c = key.charAt( i );
-      switch ( c ) {
-        case ':':
-        case '*':
-          sb.append( "\\" + c );
-          break;
-        default:
-        sb.append( c );
+    for (int i = 0; i < key.length(); i++) {
+      char c = key.charAt(i);
+      switch (c) {
+      case ':':
+      case '*':
+        sb.append("\\" + c);
+        break;
+      default:
+        sb.append(c);
       }
     }
     return sb.toString();
   }
 
   @Override
-  public T get( K key, String[] fields ) {
+  public T get(K key, String[] fields) {
     ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set( CommonParams.QT, "/get" );
-    params.set( CommonParams.FL, toDelimitedString( fields, "," ) );
-    params.set( "id",  key.toString() );
-    try {
-      QueryResponse rsp = server.query( params );
-      Object o = rsp.getResponse().get( "doc" );
-      if ( o == null ) {
+    params.set(CommonParams.QT, "/get");
+    params.set(CommonParams.FL, toDelimitedString(fields, ","));
+    params.set("id", key.toString());
+    try {
+      QueryResponse rsp = server.query(params);
+      Object o = rsp.getResponse().get("doc");
+      if (o == null) {
         return null;
       }
-      return newInstance( (SolrDocument)o, fields );
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      return newInstance((SolrDocument) o, fields);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return null;
   }
 
-  public T newInstance( SolrDocument doc, String[] fields )
-      throws IOException {
+  public T newInstance(SolrDocument doc, String[] fields) throws IOException {
     T persistent = newPersistent();
-    if ( fields == null ) {
-      fields = fieldMap.keySet().toArray( new String[fieldMap.size()] );
+    if (fields == null) {
+      fields = fieldMap.keySet().toArray(new String[fieldMap.size()]);
     }
     String pk = mapping.getPrimaryKey();
-    for ( String f : fields ) {
-      Field field = fieldMap.get( f );
+    for (String f : fields) {
+      Field field = fieldMap.get(f);
       Schema fieldSchema = field.schema();
       String sf = null;
-      if ( pk.equals( f ) ) {
+      if (pk.equals(f)) {
         sf = f;
       } else {
-        sf = mapping.getSolrField( f );                
+        sf = mapping.getSolrField(f);
       }
-      Object sv = doc.get( sf );
-      Object v;
-      if ( sv == null ) {
+      Object sv = doc.get(sf);
+      if (sv == null) {
         continue;
       }
-      switch ( fieldSchema.getType() ) {
-        case MAP:
-        case ARRAY:
-        case RECORD:
-          v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema, 
persistent.get( field.pos() ) );
-          persistent.put( field.pos(), v );
-          break;
-        case ENUM:
-          v = AvroUtils.getEnumValue( fieldSchema, (String) sv );
-          persistent.put( field.pos(), v );
-          break;
-        case FIXED:
-          throw new IOException( "???" );
-          // break;
-        case BYTES:
-          persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) );
-          break;
-        case BOOLEAN:
-        case DOUBLE:
-        case FLOAT:
-        case INT:
-        case LONG:
-          persistent.put( field.pos(), sv );
-          break;
-        case STRING:
-          persistent.put( field.pos(), new Utf8( sv.toString() ) );
-          break;
-        case UNION:
-          LOG.error( "Union is not supported yet" );
-          break;
-        default:
-          LOG.error( "Unknown field type: " + fieldSchema.getType() );
-      }
-      persistent.setDirty( field.pos() );
+
+      Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
+      persistent.put(field.pos(), v);
+      persistent.setDirty(field.pos());
+
     }
     persistent.clearDirty();
     return persistent;
   }
 
+  @SuppressWarnings("rawtypes")
+  private SpecificDatumReader getDatumReader(String schemaId, Schema 
fieldSchema) {
+    SpecificDatumReader<?> reader = (SpecificDatumReader<?>) readerMap
+        .get(schemaId);
+    if (reader == null) {
+      reader = new SpecificDatumReader(fieldSchema);// ignore dirty bits
+      SpecificDatumReader localReader = null;
+      if ((localReader = readerMap.putIfAbsent(schemaId, reader)) != null) {
+        reader = localReader;
+      }
+    }
+    return reader;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private SpecificDatumWriter getDatumWriter(String schemaId, Schema 
fieldSchema) {
+    SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap
+        .get(schemaId);
+    if (writer == null) {
+      writer = new SpecificDatumWriter(fieldSchema);// ignore dirty bits
+      writerMap.put(schemaId, writer);
+    }
+
+    return writer;
+  }
+
+  private Object deserializeFieldValue(Field field, Schema fieldSchema,
+      Object solrValue, T persistent) throws IOException {
+    Object fieldValue = null;
+    switch (fieldSchema.getType()) {
+    case MAP:
+    case ARRAY:
+    case RECORD:
+      SpecificDatumReader reader = getDatumReader(fieldSchema.getFullName(),
+          fieldSchema);
+      fieldValue = IOUtils.deserialize((byte[]) solrValue, reader, fieldSchema,
+          persistent.get(field.pos()));
+      break;
+    case ENUM:
+      fieldValue = AvroUtils.getEnumValue(fieldSchema, (String) solrValue);
+      break;
+    case FIXED:
+      throw new IOException("???");
+      // break;
+    case BYTES:
+      fieldValue = ByteBuffer.wrap((byte[]) solrValue);
+      break;
+    case STRING:
+      fieldValue = new Utf8(solrValue.toString());
+      break;
+    case UNION:
+      if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+        // schema [type0, type1]
+        Type type0 = fieldSchema.getTypes().get(0).getType();
+        Type type1 = fieldSchema.getTypes().get(1).getType();
+
+        // Check if types are different and there's a "null", like
+        // ["null","type"] or ["type","null"]
+        if (!type0.equals(type1)) {
+          if (type0.equals(Schema.Type.NULL))
+            fieldSchema = fieldSchema.getTypes().get(1);
+          else
+            fieldSchema = fieldSchema.getTypes().get(0);
+        } else {
+          fieldSchema = fieldSchema.getTypes().get(0);
+        }
+        fieldValue = deserializeFieldValue(field, fieldSchema, solrValue,
+            persistent);
+      } else {
+        SpecificDatumReader unionReader = getDatumReader(
+            String.valueOf(fieldSchema.hashCode()), fieldSchema);
+        fieldValue = IOUtils.deserialize((byte[]) solrValue, unionReader,
+            fieldSchema, persistent.get(field.pos()));
+        break;
+      }
+      break;
+    default:
+      fieldValue = solrValue;
+    }
+    return fieldValue;
+  }
+
   @Override
-  public void put( K key, T persistent ) {
+  public void put(K key, T persistent) {
     Schema schema = persistent.getSchema();
-    StateManager stateManager = persistent.getStateManager();
-    if ( !stateManager.isDirty( persistent ) ) {
+    if (!persistent.isDirty()) {
       // nothing to do
       return;
     }
     SolrInputDocument doc = new SolrInputDocument();
     // add primary key
-    doc.addField( mapping.getPrimaryKey(), key );
+    doc.addField(mapping.getPrimaryKey(), key);
     // populate the doc
     List<Field> fields = schema.getFields();
-    for ( Field field : fields ) {
-      String sf = mapping.getSolrField( field.name() );
+    for (Field field : fields) {
+      String sf = mapping.getSolrField(field.name());
       // Solr will append values to fields in a SolrInputDocument, even the key
       // mapping won't find the primary
-      if ( sf == null ) {
+      if (sf == null) {
         continue;
       }
       Schema fieldSchema = field.schema();
-      Object v = persistent.get( field.pos() );
-      if ( v == null ) {
+      Object v = persistent.get(field.pos());
+      if (v == null) {
         continue;
       }
-      switch ( fieldSchema.getType() ) {
-        case MAP:
-        case ARRAY:
-        case RECORD:
-          byte[] data = null;
-          try {
-            data = IOUtils.serialize( datumWriter, fieldSchema, v );
-          } catch ( IOException e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-          }
-          doc.addField( sf, data );
-          break;
-        case BYTES:
-          doc.addField( sf, ( (ByteBuffer) v ).array() );
-          break;
-        case ENUM:
-        case STRING:
-          doc.addField( sf, v.toString() );
-          break;
-        case BOOLEAN:
-        case DOUBLE:
-        case FLOAT:
-        case INT:
-        case LONG:
-          doc.addField( sf, v );
-          break;
-        case UNION:
-          LOG.error( "Union is not supported yet" );
-          break;
-        default:
-          LOG.error( "Unknown field type: " + fieldSchema.getType() );
-      }
+      v = serializeFieldValue(fieldSchema, v);
+      doc.addField(sf, v);
+
     }
-    LOG.info( "DOCUMENT: " + doc );
-    batch.add( doc );
-    if ( batch.size() >= batchSize ) {
+    LOG.info("DOCUMENT: " + doc);
+    batch.add(doc);
+    if (batch.size() >= batchSize) {
       try {
-        add( batch, commitWithin );
+        add(batch, commitWithin);
         batch.clear();
-      } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e.getStackTrace().toString());
+      }
+    }
+  }
+
+  private Object serializeFieldValue(Schema fieldSchema, Object fieldValue) {
+    switch (fieldSchema.getType()) {
+    case MAP:
+    case ARRAY:
+    case RECORD:
+      byte[] data = null;
+      try {
+        SpecificDatumWriter writer = getDatumWriter(fieldSchema.getFullName(),
+            fieldSchema);
+        data = IOUtils.serialize(writer, fieldSchema, fieldValue);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e.getStackTrace().toString());
+      }
+      fieldValue = data;
+      break;
+    case BYTES:
+      fieldValue = ((ByteBuffer) fieldValue).array();
+      break;
+    case ENUM:
+    case STRING:
+      fieldValue = fieldValue.toString();
+      break;
+    case UNION:
+      // TODO: If field's schema is null and one type, We do not serialization.
+      // Other all types serialize.
+      if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+        int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+        Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+        fieldValue = serializeFieldValue(unionSchema, fieldValue);
+      } else {
+        byte[] serilazeData = null;
+        try {
+          SpecificDatumWriter writer = getDatumWriter(
+              String.valueOf(fieldSchema.hashCode()), fieldSchema);
+          serilazeData = IOUtils.serialize(writer, fieldSchema, fieldValue);
+        } catch (IOException e) {
+          LOG.error(e.getMessage(), e.getStackTrace().toString());
+        }
+        fieldValue = serilazeData;
       }
+      break;
+    default:
+      // LOG.error("Unknown field type: " + fieldSchema.getType());
+      break;
     }
+    return fieldValue;
+  }
+
+  private boolean isNullable(Schema unionSchema) {
+    for (Schema innerSchema : unionSchema.getTypes()) {
+      if (innerSchema.getType().equals(Schema.Type.NULL)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Given an object and the object schema this function obtains, from within
+   * the UNION schema, the position of the type used. If no data type can be
+   * inferred then we return a default value of position 0.
+   * 
+   * @param pValue
+   * @param pUnionSchema
+   * @return the unionSchemaPosition.
+   */
+  private int getUnionSchema(Object pValue, Schema pUnionSchema) {
+    int unionSchemaPos = 0;
+    Iterator<Schema> it = pUnionSchema.getTypes().iterator();
+    while (it.hasNext()) {
+      Type schemaType = it.next().getType();
+      if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
+        return unionSchemaPos;
+      else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
+        return unionSchemaPos;
+      else if (pValue instanceof Integer && schemaType.equals(Type.INT))
+        return unionSchemaPos;
+      else if (pValue instanceof Long && schemaType.equals(Type.LONG))
+        return unionSchemaPos;
+      else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
+        return unionSchemaPos;
+      else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
+        return unionSchemaPos;
+      else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+        return unionSchemaPos;
+      else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+        return unionSchemaPos;
+      else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
+        return unionSchemaPos;
+      unionSchemaPos++;
+    }
+    // if we weren't able to determine which data type it is, then we return 
the
+    // default
+    return DEFAULT_UNION_SCHEMA;
   }
 
   @Override
-  public boolean delete( K key ) {
+  public boolean delete(K key) {
     String keyField = mapping.getPrimaryKey();
     try {
-      UpdateResponse rsp = server.deleteByQuery( keyField + ":" + 
escapeQueryKey( key.toString() ) );
+      UpdateResponse rsp = server.deleteByQuery(keyField + ":"
+          + escapeQueryKey(key.toString()));
       server.commit();
-      LOG.info( rsp.toString() );
+      LOG.info(rsp.toString());
       return true;
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return false;
   }
 
   @Override
-  public long deleteByQuery( Query<K, T> query ) {
-    String q = ( (SolrQuery<K, T>) query ).toSolrQuery();
+  public long deleteByQuery(Query<K, T> query) {
+    String q = ((SolrQuery<K, T>) query).toSolrQuery();
     try {
-      UpdateResponse rsp = server.deleteByQuery( q );
+      UpdateResponse rsp = server.deleteByQuery(q);
       server.commit();
-      LOG.info( rsp.toString() );
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      LOG.info(rsp.toString());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return 0;
   }
 
   @Override
-  public Result<K, T> execute( Query<K, T> query ) {
+  public Result<K, T> execute(Query<K, T> query) {
     try {
-      return new SolrResult<K, T>( this, query, server, resultsSize );
-    } catch ( IOException e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      return new SolrResult<K, T>(this, query, server, resultsSize);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return null;
   }
 
   @Override
   public Query<K, T> newQuery() {
-    return new SolrQuery<K, T>( this );
+    return new SolrQuery<K, T>(this);
   }
 
   @Override
-  public List<PartitionQuery<K, T>> getPartitions( Query<K, T> query )
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
       throws IOException {
     // TODO: implement this using Hadoop DB support
 
@@ -481,11 +640,11 @@ public class SolrStore<K, T extends Pers
   @Override
   public void flush() {
     try {
-      if ( batch.size() > 0 ) {
-        add( batch, commitWithin );
+      if (batch.size() > 0) {
+        add(batch, commitWithin);
         batch.clear();
       }
-    } catch ( Exception e ) {
+    } catch (Exception e) {
       LOG.error(e.getMessage(), e.getStackTrace());
     }
   }
@@ -494,15 +653,16 @@ public class SolrStore<K, T extends Pers
   public void close() {
     // In testing, the index gets closed before the commit in flush() can 
happen
     // so an exception gets thrown
-    //flush();
+    // flush();
   }
-    
-  private void add(ArrayList<SolrInputDocument> batch, int commitWithin) 
throws SolrServerException, IOException {
+
+  private void add(ArrayList<SolrInputDocument> batch, int commitWithin)
+      throws SolrServerException, IOException {
     if (commitWithin == 0) {
-      server.add( batch );
-      server.commit( false, true, true );
+      server.add(batch);
+      server.commit(false, true, true);
     } else {
-      server.add( batch, commitWithin );            
+      server.add(batch, commitWithin);
     }
-  }  
+  }
 }

Modified: gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml 
(original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml Sun Mar 
30 11:07:06 2014
@@ -22,6 +22,8 @@
     <field name="name" column="name"/>
     <field name="dateOfBirth" column="dateOfBirth"/>
     <field name="salary" column="salary"/>
+    <field name="boss" column="boss"/>
+    <field name="webpage" column="webpage"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.WebPage" 
keyClass="java.lang.String" table="WebPage">
@@ -29,6 +31,7 @@
     <field name="content" column="content"/>
     <field name="parsedContent" column="parsedContent"/>
     <field name="outlinks" column="outlinks"/>
+    <field name="headers" column="headers"/>     
     <field name="metadata" column="metadata"/>
   </class>
 </gora-orm>

Modified: gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties (original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties Sun Mar 30 
11:07:06 2014
@@ -1 +1,2 @@
 gora.solrstore.solr.url=http://localhost:9876/solr
+gora.datastore.solr.commitWithin=0

Modified: 
gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml 
(original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml 
Sun Mar 30 11:07:06 2014
@@ -28,7 +28,9 @@
     <field name="name"        type="string" indexed="true" stored="true" />
     <field name="dateOfBirth" type="long" stored="true" /> 
     <field name="salary"      type="int" stored="true" /> 
-
+    <field name="boss"        type="binary" stored="true" />
+    <field name="webpage"     type="binary" stored="true" />
+    
   </fields>
 
   <uniqueKey>ssn</uniqueKey>
@@ -37,6 +39,7 @@
     <fieldType name="string" class="solr.StrField" sortMissingLast="true" />
     <fieldType name="int" class="solr.TrieIntField" precisionStep="0" 
positionIncrementGap="0"/>
     <fieldType name="long" class="solr.TrieLongField" precisionStep="0" 
positionIncrementGap="0"/>
+    <fieldtype name="binary" class="solr.BinaryField"/>
   </types>  
 
 </schema>

Modified: 
gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml 
(original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml 
Sun Mar 30 11:07:06 2014
@@ -28,7 +28,8 @@
     <field name="parsedContent" type="binary" stored="true" /> 
     <field name="content"       type="binary" stored="true" /> 
     <field name="outlinks"      type="binary" stored="true" /> 
-    <field name="metadata"      type="binary" stored="true" />    
+    <field name="headers"       type="binary" stored="true" />     
+    <field name="metadata"      type="binary" stored="true" />
 
   </fields>
 

Modified: 
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
 (original)
+++ 
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
 Sun Mar 30 11:07:06 2014
@@ -25,6 +25,7 @@ import org.apache.gora.solr.GoraSolrTest
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.DataStoreTestBase;
+import org.junit.Ignore;
 
 public class TestSolrStore extends DataStoreTestBase {
   
@@ -49,46 +50,7 @@ public class TestSolrStore extends DataS
   }
 
 
-  public void testGetRecursive() {
-  }
-
-  public void testGetDoubleRecursive() {
-  }
-
-  public void testGetNested() {
-  }
-
-  public void testGet3UnionField() {
-  }
-
-  public void testQuery() {
-  }
-
-  public void testQueryStartKey() {
-  }
-
-  public void testQueryEndKey() {
-  }
-
-  public void testQueryKeyRange() {
-  }
-
-  public void testQueryWebPageSingleKey() {
-  }
-
-  public void testQueryWebPageSingleKeyDefaultFields() {
-  }
-
-  public void testDeleteByQuery() {
-  }
-
-  public void testGetPartitions() {
-  }
-
-  public void testUpdate() {
-  }
-
-  public void testDeleteByQueryFields() {
-  }
-
+  @Ignore("GORA-310 and GORA-311 issues are not fixed at SolrStore")
+  @Override
+  public void testDeleteByQueryFields() throws IOException {}
 }

Modified: gora/branches/GORA_94/pom.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/pom.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/pom.xml (original)
+++ gora/branches/GORA_94/pom.xml Sun Mar 30 11:07:06 2014
@@ -578,7 +578,7 @@
         <module>gora-hbase</module>
         <module>gora-accumulo</module>
         <module>gora-cassandra</module>
-        <!-- module>gora-solr</module-->
+        <module>gora-solr</module>
         <!--module>gora-dynamodb</module-->
         <!--module>gora-sql</module-->
         <module>gora-tutorial</module>
@@ -670,7 +670,7 @@
             <type>test-jar</type>
           </dependency>
           
-          <!--dependency>
+          <dependency>
             <groupId>org.apache.gora</groupId>
             <artifactId>gora-solr</artifactId>
             <version>${project.version}</version>
@@ -680,7 +680,7 @@
             <artifactId>gora-solr</artifactId>
             <version>${project.version}</version>
             <classifier>tests</classifier>
-          </dependency-->
+          </dependency>
 
           <dependency>
             <groupId>org.apache.gora</groupId>
@@ -905,7 +905,7 @@
             </dependency>
 
             <!-- Solr Dependencies -->
-            <!--dependency>
+            <dependency>
               <groupId>org.apache.solr</groupId>
               <artifactId>solr-core</artifactId>
               <version>${lucene-solr.version}</version>
@@ -1194,7 +1194,7 @@
               <groupId>com.carrotsearch.randomizedtesting</groupId>
               <artifactId>randomizedtesting-runner</artifactId>
               <version>2.0.10</version>
-            </dependency-->
+            </dependency>
               
             <!-- Amazon Dependencies -->
             <dependency>


Reply via email to