Author: lewismc
Date: Sun Mar 30 11:07:06 2014
New Revision: 1583119
URL: http://svn.apache.org/r1583119
Log:
GORA-303 Upgrade to Avro 1.7.X in gora-solr
Modified:
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
gora/branches/GORA_94/gora-solr/pom.xml
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml
gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties
gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
gora/branches/GORA_94/pom.xml
Modified:
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
(original)
+++
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
Sun Mar 30 11:07:06 2014
@@ -165,6 +165,18 @@ public class IOUtils {
/**
* Serializes the field object using the datumWriter.
*/
+ public static<T> void serialize(OutputStream os,
+ SpecificDatumWriter<T> datumWriter, Schema schema, T object)
+ throws IOException {
+
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+ datumWriter.write(object, encoder);
+ encoder.flush();
+ }
+
+ /**
+ * Serializes the field object using the datumWriter.
+ */
public static<T extends SpecificRecord> byte[]
serialize(SpecificDatumWriter<T> datumWriter
, Schema schema, T object) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
@@ -172,6 +184,16 @@ public class IOUtils {
return os.toByteArray();
}
+ /**
+ * Serializes the field object using the datumWriter.
+ */
+ public static<T> byte[] serialize(SpecificDatumWriter<T> datumWriter
+ , Schema schema, T object) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ serialize(os, datumWriter, schema, object);
+ return os.toByteArray();
+ }
+
/** Deserializes the object in the given datainput using
* available Hadoop serializations.
* @throws IOException
@@ -258,6 +280,16 @@ public class IOUtils {
}
/**
+ * Deserializes the field object using the datumReader.
+ */
+ public static<K, T> T deserialize(byte[] bytes,
+ SpecificDatumReader<T> datumReader, Schema schema, T object)
+ throws IOException {
+ decoder = DecoderFactory.get().binaryDecoder(bytes, decoder);
+ return (T)datumReader.read(object, decoder);
+ }
+
+ /**
* Writes a byte[] to the output, representing whether each given field is
null
* or not. A Vint and ceil( fields.length / 8 ) bytes are written to the
output.
* @param out the output to write to
Modified: gora/branches/GORA_94/gora-solr/pom.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/pom.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/pom.xml (original)
+++ gora/branches/GORA_94/gora-solr/pom.xml Sun Mar 30 11:07:06 2014
@@ -188,7 +188,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <scope>compile</scope>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
Modified:
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
(original)
+++
gora/branches/GORA_94/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
Sun Mar 30 11:07:06 2014
@@ -17,13 +17,20 @@ package org.apache.gora.solr.store;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.StateManager;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -53,8 +60,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
-
- private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class );
+
+ private static final Logger LOG = LoggerFactory.getLogger(SolrStore.class);
protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml";
@@ -63,15 +70,15 @@ public class SolrStore<K, T extends Pers
protected static final String SOLR_CONFIG_PROPERTY = "solr.config";
protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema";
-
+
protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize";
-
- //protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
+
+ // protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
protected static final String SOLR_COMMIT_WITHIN_PROPERTY =
"solr.commitWithin";
protected static final String SOLR_RESULTS_SIZE_PROPERTY =
"solr.resultsSize";
-
+
protected static final int DEFAULT_BATCH_SIZE = 100;
protected static final int DEFAULT_COMMIT_WITHIN = 1000;
@@ -92,85 +99,116 @@ public class SolrStore<K, T extends Pers
private int resultsSize = DEFAULT_RESULTS_SIZE;
- @Override
- public void initialize( Class<K> keyClass, Class<T> persistentClass,
Properties properties ) {
- super.initialize( keyClass, persistentClass, properties );
- try {
- String mappingFile = DataStoreFactory.getMappingFile( properties, this,
DEFAULT_MAPPING_FILE );
- mapping = readMapping( mappingFile );
- }
- catch ( IOException e ) {
- LOG.error( e.getMessage() );
- LOG.error( e.getStackTrace().toString() );
- }
-
- solrServerUrl = DataStoreFactory.findProperty( properties, this,
SOLR_URL_PROPERTY, null );
- solrConfig = DataStoreFactory.findProperty( properties, this,
SOLR_CONFIG_PROPERTY, null );
- solrSchema = DataStoreFactory.findProperty( properties, this,
SOLR_SCHEMA_PROPERTY, null );
- LOG.info( "Using Solr server at " + solrServerUrl );
- adminServer = new HttpSolrServer( solrServerUrl );
- server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() );
- if ( autoCreateSchema ) {
+ /**
+ * Default schema index with value "0" used when AVRO Union data types are
+ * stored
+ */
+ public static int DEFAULT_UNION_SCHEMA = 0;
+
+ /*
+ * Create a threadlocal map for the datum readers and writers, because they
+ * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
+ * they are thread safe, it is possible to maintain a single reader and
writer
+ * pair for every schema, instead of one for every thread.
+ */
+
+ public static final ConcurrentHashMap<String, SpecificDatumReader<?>>
readerMap = new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>>
writerMap = new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
+ Properties properties) {
+ super.initialize(keyClass, persistentClass, properties);
+ try {
+ String mappingFile = DataStoreFactory.getMappingFile(properties, this,
+ DEFAULT_MAPPING_FILE);
+ mapping = readMapping(mappingFile);
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ }
+
+ solrServerUrl = DataStoreFactory.findProperty(properties, this,
+ SOLR_URL_PROPERTY, null);
+ solrConfig = DataStoreFactory.findProperty(properties, this,
+ SOLR_CONFIG_PROPERTY, null);
+ solrSchema = DataStoreFactory.findProperty(properties, this,
+ SOLR_SCHEMA_PROPERTY, null);
+ LOG.info("Using Solr server at " + solrServerUrl);
+ adminServer = new HttpSolrServer(solrServerUrl);
+ server = new HttpSolrServer(solrServerUrl + "/" + mapping.getCoreName());
+ if (autoCreateSchema) {
createSchema();
}
- String batchSizeString = DataStoreFactory.findProperty( properties, this,
SOLR_BATCH_SIZE_PROPERTY, null );
- if ( batchSizeString != null ) {
+ String batchSizeString = DataStoreFactory.findProperty(properties, this,
+ SOLR_BATCH_SIZE_PROPERTY, null);
+ if (batchSizeString != null) {
try {
- batchSize = Integer.parseInt( batchSizeString );
- } catch ( NumberFormatException nfe ) {
- LOG.warn( "Invalid batch size '" + batchSizeString + "', using default
" + DEFAULT_BATCH_SIZE );
+ batchSize = Integer.parseInt(batchSizeString);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid batch size '" + batchSizeString + "', using default "
+ + DEFAULT_BATCH_SIZE);
}
}
- batch = new ArrayList<SolrInputDocument>( batchSize );
- String commitWithinString = DataStoreFactory.findProperty( properties,
this, SOLR_COMMIT_WITHIN_PROPERTY, null );
- if ( commitWithinString != null ) {
+ batch = new ArrayList<SolrInputDocument>(batchSize);
+ String commitWithinString = DataStoreFactory.findProperty(properties, this,
+ SOLR_COMMIT_WITHIN_PROPERTY, null);
+ if (commitWithinString != null) {
try {
- commitWithin = Integer.parseInt( commitWithinString );
- } catch ( NumberFormatException nfe ) {
- LOG.warn( "Invalid commit within '" + commitWithinString + "', using
default " + DEFAULT_COMMIT_WITHIN );
+ commitWithin = Integer.parseInt(commitWithinString);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid commit within '" + commitWithinString
+ + "', using default " + DEFAULT_COMMIT_WITHIN);
}
}
- String resultsSizeString = DataStoreFactory.findProperty( properties,
this, SOLR_RESULTS_SIZE_PROPERTY, null );
- if ( resultsSizeString != null ) {
+ String resultsSizeString = DataStoreFactory.findProperty(properties, this,
+ SOLR_RESULTS_SIZE_PROPERTY, null);
+ if (resultsSizeString != null) {
try {
- resultsSize = Integer.parseInt( resultsSizeString );
- } catch ( NumberFormatException nfe ) {
- LOG.warn( "Invalid results size '" + resultsSizeString + "', using
default " + DEFAULT_RESULTS_SIZE );
+ resultsSize = Integer.parseInt(resultsSizeString);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid results size '" + resultsSizeString
+ + "', using default " + DEFAULT_RESULTS_SIZE);
}
}
}
@SuppressWarnings("unchecked")
- private SolrMapping readMapping( String filename ) throws IOException {
+ private SolrMapping readMapping(String filename) throws IOException {
SolrMapping map = new SolrMapping();
try {
SAXBuilder builder = new SAXBuilder();
- Document doc = builder.build(
getClass().getClassLoader().getResourceAsStream( filename ) );
+ Document doc = builder.build(getClass().getClassLoader()
+ .getResourceAsStream(filename));
- List<Element> classes = doc.getRootElement().getChildren( "class" );
+ List<Element> classes = doc.getRootElement().getChildren("class");
- for ( Element classElement : classes ) {
- if ( classElement.getAttributeValue( "keyClass" ).equals(
keyClass.getCanonicalName() )
- && classElement.getAttributeValue( "name" ).equals(
persistentClass.getCanonicalName() ) ) {
-
- String tableName = getSchemaName( classElement.getAttributeValue(
"table" ), persistentClass );
- map.setCoreName( tableName );
-
- Element primaryKeyEl = classElement.getChild( "primarykey" );
- map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) );
-
- List<Element> fields = classElement.getChildren( "field" );
-
- for ( Element field : fields ) {
- String fieldName = field.getAttributeValue( "name" );
- String columnName = field.getAttributeValue( "column" );
- map.addField( fieldName, columnName );
+ for (Element classElement : classes) {
+ if (classElement.getAttributeValue("keyClass").equals(
+ keyClass.getCanonicalName())
+ && classElement.getAttributeValue("name").equals(
+ persistentClass.getCanonicalName())) {
+
+ String tableName = getSchemaName(
+ classElement.getAttributeValue("table"), persistentClass);
+ map.setCoreName(tableName);
+
+ Element primaryKeyEl = classElement.getChild("primarykey");
+ map.setPrimaryKey(primaryKeyEl.getAttributeValue("column"));
+
+ List<Element> fields = classElement.getChildren("field");
+
+ for (Element field : fields) {
+ String fieldName = field.getAttributeValue("name");
+ String columnName = field.getAttributeValue("column");
+ map.addField(fieldName, columnName);
}
break;
}
}
- } catch ( Exception ex ) {
- throw new IOException( ex );
+ } catch (Exception ex) {
+ throw new IOException(ex);
}
return map;
@@ -188,11 +226,11 @@ public class SolrStore<K, T extends Pers
@Override
public void createSchema() {
try {
- if ( !schemaExists() )
- CoreAdminRequest.createCore( mapping.getCoreName(),
mapping.getCoreName(), adminServer, solrConfig,
- solrSchema );
- } catch ( Exception e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ if (!schemaExists())
+ CoreAdminRequest.createCore(mapping.getCoreName(),
+ mapping.getCoreName(), adminServer, solrConfig, solrSchema);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
}
@@ -200,11 +238,11 @@ public class SolrStore<K, T extends Pers
/** Default implementation deletes and recreates the schema*/
public void truncateSchema() {
try {
- server.deleteByQuery( "*:*" );
+ server.deleteByQuery("*:*");
server.commit();
- } catch ( Exception e ) {
+ } catch (Exception e) {
// ignore?
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
}
@@ -212,20 +250,20 @@ public class SolrStore<K, T extends Pers
public void deleteSchema() {
// XXX should this be only in truncateSchema ???
try {
- server.deleteByQuery( "*:*" );
+ server.deleteByQuery("*:*");
server.commit();
- } catch ( Exception e ) {
- // ignore?
- // LOG.error(e.getMessage());
- // LOG.error(e.getStackTrace().toString());
+ } catch (Exception e) {
+ // ignore?
+ // LOG.error(e.getMessage());
+ // LOG.error(e.getStackTrace().toString());
}
try {
- CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer );
- } catch ( Exception e ) {
- if ( e.getMessage().contains( "No such core" ) ) {
+ CoreAdminRequest.unloadCore(mapping.getCoreName(), adminServer);
+ } catch (Exception e) {
+ if (e.getMessage().contains("No such core")) {
return; // it's ok, the core is not there
} else {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
}
}
@@ -234,239 +272,360 @@ public class SolrStore<K, T extends Pers
public boolean schemaExists() {
boolean exists = false;
try {
- CoreAdminResponse rsp = CoreAdminRequest.getStatus(
mapping.getCoreName(), adminServer );
- exists = rsp.getUptime( mapping.getCoreName() ) != null;
- } catch ( Exception e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ CoreAdminResponse rsp = CoreAdminRequest.getStatus(mapping.getCoreName(),
+ adminServer);
+ exists = rsp.getUptime(mapping.getCoreName()) != null;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
return exists;
}
- private static final String toDelimitedString( String[] arr, String sep ) {
- if ( arr == null || arr.length == 0 ) {
+ private static final String toDelimitedString(String[] arr, String sep) {
+ if (arr == null || arr.length == 0) {
return "";
}
StringBuilder sb = new StringBuilder();
- for ( int i = 0; i < arr.length; i++ ) {
- if ( i > 0 )
- sb.append( sep );
- sb.append( arr[i] );
+ for (int i = 0; i < arr.length; i++) {
+ if (i > 0)
+ sb.append(sep);
+ sb.append(arr[i]);
}
return sb.toString();
}
- public static String escapeQueryKey( String key ) {
- if ( key == null ) {
+ public static String escapeQueryKey(String key) {
+ if (key == null) {
return null;
}
StringBuilder sb = new StringBuilder();
- for ( int i = 0; i < key.length(); i++ ) {
- char c = key.charAt( i );
- switch ( c ) {
- case ':':
- case '*':
- sb.append( "\\" + c );
- break;
- default:
- sb.append( c );
+ for (int i = 0; i < key.length(); i++) {
+ char c = key.charAt(i);
+ switch (c) {
+ case ':':
+ case '*':
+ sb.append("\\" + c);
+ break;
+ default:
+ sb.append(c);
}
}
return sb.toString();
}
@Override
- public T get( K key, String[] fields ) {
+ public T get(K key, String[] fields) {
ModifiableSolrParams params = new ModifiableSolrParams();
- params.set( CommonParams.QT, "/get" );
- params.set( CommonParams.FL, toDelimitedString( fields, "," ) );
- params.set( "id", key.toString() );
- try {
- QueryResponse rsp = server.query( params );
- Object o = rsp.getResponse().get( "doc" );
- if ( o == null ) {
+ params.set(CommonParams.QT, "/get");
+ params.set(CommonParams.FL, toDelimitedString(fields, ","));
+ params.set("id", key.toString());
+ try {
+ QueryResponse rsp = server.query(params);
+ Object o = rsp.getResponse().get("doc");
+ if (o == null) {
return null;
}
- return newInstance( (SolrDocument)o, fields );
- } catch ( Exception e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ return newInstance((SolrDocument) o, fields);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
return null;
}
- public T newInstance( SolrDocument doc, String[] fields )
- throws IOException {
+ public T newInstance(SolrDocument doc, String[] fields) throws IOException {
T persistent = newPersistent();
- if ( fields == null ) {
- fields = fieldMap.keySet().toArray( new String[fieldMap.size()] );
+ if (fields == null) {
+ fields = fieldMap.keySet().toArray(new String[fieldMap.size()]);
}
String pk = mapping.getPrimaryKey();
- for ( String f : fields ) {
- Field field = fieldMap.get( f );
+ for (String f : fields) {
+ Field field = fieldMap.get(f);
Schema fieldSchema = field.schema();
String sf = null;
- if ( pk.equals( f ) ) {
+ if (pk.equals(f)) {
sf = f;
} else {
- sf = mapping.getSolrField( f );
+ sf = mapping.getSolrField(f);
}
- Object sv = doc.get( sf );
- Object v;
- if ( sv == null ) {
+ Object sv = doc.get(sf);
+ if (sv == null) {
continue;
}
- switch ( fieldSchema.getType() ) {
- case MAP:
- case ARRAY:
- case RECORD:
- v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema,
persistent.get( field.pos() ) );
- persistent.put( field.pos(), v );
- break;
- case ENUM:
- v = AvroUtils.getEnumValue( fieldSchema, (String) sv );
- persistent.put( field.pos(), v );
- break;
- case FIXED:
- throw new IOException( "???" );
- // break;
- case BYTES:
- persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) );
- break;
- case BOOLEAN:
- case DOUBLE:
- case FLOAT:
- case INT:
- case LONG:
- persistent.put( field.pos(), sv );
- break;
- case STRING:
- persistent.put( field.pos(), new Utf8( sv.toString() ) );
- break;
- case UNION:
- LOG.error( "Union is not supported yet" );
- break;
- default:
- LOG.error( "Unknown field type: " + fieldSchema.getType() );
- }
- persistent.setDirty( field.pos() );
+
+ Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
+ persistent.put(field.pos(), v);
+ persistent.setDirty(field.pos());
+
}
persistent.clearDirty();
return persistent;
}
+ @SuppressWarnings("rawtypes")
+ private SpecificDatumReader getDatumReader(String schemaId, Schema
fieldSchema) {
+ SpecificDatumReader<?> reader = (SpecificDatumReader<?>) readerMap
+ .get(schemaId);
+ if (reader == null) {
+ reader = new SpecificDatumReader(fieldSchema);// ignore dirty bits
+ SpecificDatumReader localReader = null;
+ if ((localReader = readerMap.putIfAbsent(schemaId, reader)) != null) {
+ reader = localReader;
+ }
+ }
+ return reader;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private SpecificDatumWriter getDatumWriter(String schemaId, Schema
fieldSchema) {
+ SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap
+ .get(schemaId);
+ if (writer == null) {
+ writer = new SpecificDatumWriter(fieldSchema);// ignore dirty bits
+ writerMap.put(schemaId, writer);
+ }
+
+ return writer;
+ }
+
+ private Object deserializeFieldValue(Field field, Schema fieldSchema,
+ Object solrValue, T persistent) throws IOException {
+ Object fieldValue = null;
+ switch (fieldSchema.getType()) {
+ case MAP:
+ case ARRAY:
+ case RECORD:
+ SpecificDatumReader reader = getDatumReader(fieldSchema.getFullName(),
+ fieldSchema);
+ fieldValue = IOUtils.deserialize((byte[]) solrValue, reader, fieldSchema,
+ persistent.get(field.pos()));
+ break;
+ case ENUM:
+ fieldValue = AvroUtils.getEnumValue(fieldSchema, (String) solrValue);
+ break;
+ case FIXED:
+ throw new IOException("???");
+ // break;
+ case BYTES:
+ fieldValue = ByteBuffer.wrap((byte[]) solrValue);
+ break;
+ case STRING:
+ fieldValue = new Utf8(solrValue.toString());
+ break;
+ case UNION:
+ if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+ // schema [type0, type1]
+ Type type0 = fieldSchema.getTypes().get(0).getType();
+ Type type1 = fieldSchema.getTypes().get(1).getType();
+
+ // Check if types are different and there's a "null", like
+ // ["null","type"] or ["type","null"]
+ if (!type0.equals(type1)) {
+ if (type0.equals(Schema.Type.NULL))
+ fieldSchema = fieldSchema.getTypes().get(1);
+ else
+ fieldSchema = fieldSchema.getTypes().get(0);
+ } else {
+ fieldSchema = fieldSchema.getTypes().get(0);
+ }
+ fieldValue = deserializeFieldValue(field, fieldSchema, solrValue,
+ persistent);
+ } else {
+ SpecificDatumReader unionReader = getDatumReader(
+ String.valueOf(fieldSchema.hashCode()), fieldSchema);
+ fieldValue = IOUtils.deserialize((byte[]) solrValue, unionReader,
+ fieldSchema, persistent.get(field.pos()));
+ break;
+ }
+ break;
+ default:
+ fieldValue = solrValue;
+ }
+ return fieldValue;
+ }
+
@Override
- public void put( K key, T persistent ) {
+ public void put(K key, T persistent) {
Schema schema = persistent.getSchema();
- StateManager stateManager = persistent.getStateManager();
- if ( !stateManager.isDirty( persistent ) ) {
+ if (!persistent.isDirty()) {
// nothing to do
return;
}
SolrInputDocument doc = new SolrInputDocument();
// add primary key
- doc.addField( mapping.getPrimaryKey(), key );
+ doc.addField(mapping.getPrimaryKey(), key);
// populate the doc
List<Field> fields = schema.getFields();
- for ( Field field : fields ) {
- String sf = mapping.getSolrField( field.name() );
+ for (Field field : fields) {
+ String sf = mapping.getSolrField(field.name());
// Solr will append values to fields in a SolrInputDocument, even the key
// mapping won't find the primary
- if ( sf == null ) {
+ if (sf == null) {
continue;
}
Schema fieldSchema = field.schema();
- Object v = persistent.get( field.pos() );
- if ( v == null ) {
+ Object v = persistent.get(field.pos());
+ if (v == null) {
continue;
}
- switch ( fieldSchema.getType() ) {
- case MAP:
- case ARRAY:
- case RECORD:
- byte[] data = null;
- try {
- data = IOUtils.serialize( datumWriter, fieldSchema, v );
- } catch ( IOException e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
- }
- doc.addField( sf, data );
- break;
- case BYTES:
- doc.addField( sf, ( (ByteBuffer) v ).array() );
- break;
- case ENUM:
- case STRING:
- doc.addField( sf, v.toString() );
- break;
- case BOOLEAN:
- case DOUBLE:
- case FLOAT:
- case INT:
- case LONG:
- doc.addField( sf, v );
- break;
- case UNION:
- LOG.error( "Union is not supported yet" );
- break;
- default:
- LOG.error( "Unknown field type: " + fieldSchema.getType() );
- }
+ v = serializeFieldValue(fieldSchema, v);
+ doc.addField(sf, v);
+
}
- LOG.info( "DOCUMENT: " + doc );
- batch.add( doc );
- if ( batch.size() >= batchSize ) {
+ LOG.info("DOCUMENT: " + doc);
+ batch.add(doc);
+ if (batch.size() >= batchSize) {
try {
- add( batch, commitWithin );
+ add(batch, commitWithin);
batch.clear();
- } catch ( Exception e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
+ }
+ }
+ }
+
+ private Object serializeFieldValue(Schema fieldSchema, Object fieldValue) {
+ switch (fieldSchema.getType()) {
+ case MAP:
+ case ARRAY:
+ case RECORD:
+ byte[] data = null;
+ try {
+ SpecificDatumWriter writer = getDatumWriter(fieldSchema.getFullName(),
+ fieldSchema);
+ data = IOUtils.serialize(writer, fieldSchema, fieldValue);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
+ }
+ fieldValue = data;
+ break;
+ case BYTES:
+ fieldValue = ((ByteBuffer) fieldValue).array();
+ break;
+ case ENUM:
+ case STRING:
+ fieldValue = fieldValue.toString();
+ break;
+ case UNION:
+ // TODO: If field's schema is null and one type, We do not serialization.
+ // Other all types serialize.
+ if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+ int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+ Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+ fieldValue = serializeFieldValue(unionSchema, fieldValue);
+ } else {
+ byte[] serilazeData = null;
+ try {
+ SpecificDatumWriter writer = getDatumWriter(
+ String.valueOf(fieldSchema.hashCode()), fieldSchema);
+ serilazeData = IOUtils.serialize(writer, fieldSchema, fieldValue);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
+ }
+ fieldValue = serilazeData;
}
+ break;
+ default:
+ // LOG.error("Unknown field type: " + fieldSchema.getType());
+ break;
}
+ return fieldValue;
+ }
+
+ private boolean isNullable(Schema unionSchema) {
+ for (Schema innerSchema : unionSchema.getTypes()) {
+ if (innerSchema.getType().equals(Schema.Type.NULL)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Given an object and the object schema this function obtains, from within
+ * the UNION schema, the position of the type used. If no data type can be
+ * inferred then we return a default value of position 0.
+ *
+ * @param pValue
+ * @param pUnionSchema
+ * @return the unionSchemaPosition.
+ */
+ private int getUnionSchema(Object pValue, Schema pUnionSchema) {
+ int unionSchemaPos = 0;
+ Iterator<Schema> it = pUnionSchema.getTypes().iterator();
+ while (it.hasNext()) {
+ Type schemaType = it.next().getType();
+ if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
+ return unionSchemaPos;
+ else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
+ return unionSchemaPos;
+ else if (pValue instanceof Integer && schemaType.equals(Type.INT))
+ return unionSchemaPos;
+ else if (pValue instanceof Long && schemaType.equals(Type.LONG))
+ return unionSchemaPos;
+ else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
+ return unionSchemaPos;
+ else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
+ return unionSchemaPos;
+ else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
+ return unionSchemaPos;
+ else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+ return unionSchemaPos;
+ else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+ return unionSchemaPos;
+ else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
+ return unionSchemaPos;
+ unionSchemaPos++;
+ }
+ // if we weren't able to determine which data type it is, then we return
the
+ // default
+ return DEFAULT_UNION_SCHEMA;
}
@Override
- public boolean delete( K key ) {
+ public boolean delete(K key) {
String keyField = mapping.getPrimaryKey();
try {
- UpdateResponse rsp = server.deleteByQuery( keyField + ":" +
escapeQueryKey( key.toString() ) );
+ UpdateResponse rsp = server.deleteByQuery(keyField + ":"
+ + escapeQueryKey(key.toString()));
server.commit();
- LOG.info( rsp.toString() );
+ LOG.info(rsp.toString());
return true;
- } catch ( Exception e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
return false;
}
@Override
- public long deleteByQuery( Query<K, T> query ) {
- String q = ( (SolrQuery<K, T>) query ).toSolrQuery();
+ public long deleteByQuery(Query<K, T> query) {
+ String q = ((SolrQuery<K, T>) query).toSolrQuery();
try {
- UpdateResponse rsp = server.deleteByQuery( q );
+ UpdateResponse rsp = server.deleteByQuery(q);
server.commit();
- LOG.info( rsp.toString() );
- } catch ( Exception e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ LOG.info(rsp.toString());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
return 0;
}
@Override
- public Result<K, T> execute( Query<K, T> query ) {
+ public Result<K, T> execute(Query<K, T> query) {
try {
- return new SolrResult<K, T>( this, query, server, resultsSize );
- } catch ( IOException e ) {
- LOG.error( e.getMessage(), e.getStackTrace().toString() );
+ return new SolrResult<K, T>(this, query, server, resultsSize);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e.getStackTrace().toString());
}
return null;
}
@Override
public Query<K, T> newQuery() {
- return new SolrQuery<K, T>( this );
+ return new SolrQuery<K, T>(this);
}
@Override
- public List<PartitionQuery<K, T>> getPartitions( Query<K, T> query )
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
// TODO: implement this using Hadoop DB support
@@ -481,11 +640,11 @@ public class SolrStore<K, T extends Pers
@Override
public void flush() {
try {
- if ( batch.size() > 0 ) {
- add( batch, commitWithin );
+ if (batch.size() > 0) {
+ add(batch, commitWithin);
batch.clear();
}
- } catch ( Exception e ) {
+ } catch (Exception e) {
LOG.error(e.getMessage(), e.getStackTrace());
}
}
@@ -494,15 +653,16 @@ public class SolrStore<K, T extends Pers
public void close() {
// In testing, the index gets closed before the commit in flush() can
happen
// so an exception gets thrown
- //flush();
+ // flush();
}
-
- private void add(ArrayList<SolrInputDocument> batch, int commitWithin)
throws SolrServerException, IOException {
+
+ private void add(ArrayList<SolrInputDocument> batch, int commitWithin)
+ throws SolrServerException, IOException {
if (commitWithin == 0) {
- server.add( batch );
- server.commit( false, true, true );
+ server.add(batch);
+ server.commit(false, true, true);
} else {
- server.add( batch, commitWithin );
+ server.add(batch, commitWithin);
}
- }
+ }
}
Modified: gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml
(original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/gora-solr-mapping.xml Sun Mar
30 11:07:06 2014
@@ -22,6 +22,8 @@
<field name="name" column="name"/>
<field name="dateOfBirth" column="dateOfBirth"/>
<field name="salary" column="salary"/>
+ <field name="boss" column="boss"/>
+ <field name="webpage" column="webpage"/>
</class>
<class name="org.apache.gora.examples.generated.WebPage"
keyClass="java.lang.String" table="WebPage">
@@ -29,6 +31,7 @@
<field name="content" column="content"/>
<field name="parsedContent" column="parsedContent"/>
<field name="outlinks" column="outlinks"/>
+ <field name="headers" column="headers"/>
<field name="metadata" column="metadata"/>
</class>
</gora-orm>
Modified: gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties (original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/gora.properties Sun Mar 30
11:07:06 2014
@@ -1 +1,2 @@
gora.solrstore.solr.url=http://localhost:9876/solr
+gora.datastore.solr.commitWithin=0
Modified:
gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
(original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
Sun Mar 30 11:07:06 2014
@@ -28,7 +28,9 @@
<field name="name" type="string" indexed="true" stored="true" />
<field name="dateOfBirth" type="long" stored="true" />
<field name="salary" type="int" stored="true" />
-
+ <field name="boss" type="binary" stored="true" />
+ <field name="webpage" type="binary" stored="true" />
+
</fields>
<uniqueKey>ssn</uniqueKey>
@@ -37,6 +39,7 @@
<fieldType name="string" class="solr.StrField" sortMissingLast="true" />
<fieldType name="int" class="solr.TrieIntField" precisionStep="0"
positionIncrementGap="0"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0"
positionIncrementGap="0"/>
+ <fieldtype name="binary" class="solr.BinaryField"/>
</types>
</schema>
Modified:
gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
(original)
+++ gora/branches/GORA_94/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
Sun Mar 30 11:07:06 2014
@@ -28,7 +28,8 @@
<field name="parsedContent" type="binary" stored="true" />
<field name="content" type="binary" stored="true" />
<field name="outlinks" type="binary" stored="true" />
- <field name="metadata" type="binary" stored="true" />
+ <field name="headers" type="binary" stored="true" />
+ <field name="metadata" type="binary" stored="true" />
</fields>
Modified:
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
(original)
+++
gora/branches/GORA_94/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
Sun Mar 30 11:07:06 2014
@@ -25,6 +25,7 @@ import org.apache.gora.solr.GoraSolrTest
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.DataStoreTestBase;
+import org.junit.Ignore;
public class TestSolrStore extends DataStoreTestBase {
@@ -49,46 +50,7 @@ public class TestSolrStore extends DataS
}
- public void testGetRecursive() {
- }
-
- public void testGetDoubleRecursive() {
- }
-
- public void testGetNested() {
- }
-
- public void testGet3UnionField() {
- }
-
- public void testQuery() {
- }
-
- public void testQueryStartKey() {
- }
-
- public void testQueryEndKey() {
- }
-
- public void testQueryKeyRange() {
- }
-
- public void testQueryWebPageSingleKey() {
- }
-
- public void testQueryWebPageSingleKeyDefaultFields() {
- }
-
- public void testDeleteByQuery() {
- }
-
- public void testGetPartitions() {
- }
-
- public void testUpdate() {
- }
-
- public void testDeleteByQueryFields() {
- }
-
+ @Ignore("GORA-310 and GORA-311 issues are not fixed at SolrStore")
+ @Override
+ public void testDeleteByQueryFields() throws IOException {}
}
Modified: gora/branches/GORA_94/pom.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/pom.xml?rev=1583119&r1=1583118&r2=1583119&view=diff
==============================================================================
--- gora/branches/GORA_94/pom.xml (original)
+++ gora/branches/GORA_94/pom.xml Sun Mar 30 11:07:06 2014
@@ -578,7 +578,7 @@
<module>gora-hbase</module>
<module>gora-accumulo</module>
<module>gora-cassandra</module>
- <!-- module>gora-solr</module-->
+ <module>gora-solr</module>
<!--module>gora-dynamodb</module-->
<!--module>gora-sql</module-->
<module>gora-tutorial</module>
@@ -670,7 +670,7 @@
<type>test-jar</type>
</dependency>
- <!--dependency>
+ <dependency>
<groupId>org.apache.gora</groupId>
<artifactId>gora-solr</artifactId>
<version>${project.version}</version>
@@ -680,7 +680,7 @@
<artifactId>gora-solr</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
- </dependency-->
+ </dependency>
<dependency>
<groupId>org.apache.gora</groupId>
@@ -905,7 +905,7 @@
</dependency>
<!-- Solr Dependencies -->
- <!--dependency>
+ <dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${lucene-solr.version}</version>
@@ -1194,7 +1194,7 @@
<groupId>com.carrotsearch.randomizedtesting</groupId>
<artifactId>randomizedtesting-runner</artifactId>
<version>2.0.10</version>
- </dependency-->
+ </dependency>
<!-- Amazon Dependencies -->
<dependency>