Author: tomwhite
Date: Thu Sep 3 01:36:33 2009
New Revision: 810756
URL: http://svn.apache.org/viewvc?rev=810756&view=rev
Log:
HADOOP-6165. Add metadata to Serializations.
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationBase.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializerBase.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/core-default.xml
hadoop/common/trunk/src/java/org/apache/hadoop/io/DefaultStringifier.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
hadoop/common/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Sep 3 01:36:33 2009
@@ -166,6 +166,8 @@
the io package and makes it available to other users (MAPREDUCE-318).
(Jothi Padmanabhan via ddas)
+ HADOOP-6165. Add metadata to Serializations. (tomwhite)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified: hadoop/common/trunk/src/java/core-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/core-default.xml?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/core-default.xml (original)
+++ hadoop/common/trunk/src/java/core-default.xml Thu Sep 3 01:36:33 2009
@@ -101,7 +101,7 @@
<property>
<name>io.serializations</name>
-
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
+
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.apache.hadoop.io.serializer.avro.AvroGenericSerialization</value>
<description>A list of serialization classes that can be used for
obtaining serializers and deserializers.</description>
</property>
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/DefaultStringifier.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/DefaultStringifier.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/DefaultStringifier.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/DefaultStringifier.java
Thu Sep 3 01:36:33 2009
@@ -21,20 +21,21 @@
import java.io.IOException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
+import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializerBase;
import org.apache.hadoop.util.GenericsUtil;
/**
* DefaultStringifier is the default implementation of the {...@link
Stringifier}
* interface which stringifies the objects using base64 encoding of the
- * serialized version of the objects. The {...@link Serializer} and
- * {...@link Deserializer} are obtained from the {...@link
SerializationFactory}.
+ * serialized version of the objects. The {...@link SerializerBase} and
+ * {...@link DeserializerBase} are obtained from the {...@link
SerializationFactory}.
* <br>
* DefaultStringifier offers convenience methods to store/load objects to/from
* the configuration.
@@ -45,9 +46,9 @@
private static final String SEPARATOR = ",";
- private Serializer<T> serializer;
+ private SerializerBase<T> serializer;
- private Deserializer<T> deserializer;
+ private DeserializerBase<T> deserializer;
private DataInputBuffer inBuf;
@@ -56,8 +57,9 @@
public DefaultStringifier(Configuration conf, Class<T> c) {
SerializationFactory factory = new SerializationFactory(conf);
- this.serializer = factory.getSerializer(c);
- this.deserializer = factory.getDeserializer(c);
+ Map<String, String> metadata = SerializationBase.getMetadataFromClass(c);
+ this.serializer = factory.getSerializer(metadata);
+ this.deserializer = factory.getDeserializer(metadata);
this.inBuf = new DataInputBuffer();
this.outBuf = new DataOutputBuffer();
try {
@@ -102,7 +104,7 @@
* @param item the object to be stored
* @param keyName the name of the key to use
* @throws IOException : forwards Exceptions from the underlying
- * {...@link Serialization} classes.
+ * {...@link SerializationBase} classes.
*/
public static <K> void store(Configuration conf, K item, String keyName)
throws IOException {
@@ -122,7 +124,7 @@
* @param itemClass the class of the item
* @return restored object
* @throws IOException : forwards Exceptions from the underlying
- * {...@link Serialization} classes.
+ * {...@link SerializationBase} classes.
*/
public static <K> K load(Configuration conf, String keyName,
Class<K> itemClass) throws IOException {
@@ -145,7 +147,7 @@
* @param keyName the name of the key to use
* @throws IndexOutOfBoundsException if the items array is empty
* @throws IOException : forwards Exceptions from the underlying
- * {...@link Serialization} classes.
+ * {...@link SerializationBase} classes.
*/
public static <K> void storeArray(Configuration conf, K[] items,
String keyName) throws IOException {
@@ -173,7 +175,7 @@
* @param itemClass the class of the item
* @return restored object
* @throws IOException : forwards Exceptions from the underlying
- * {...@link Serialization} classes.
+ * {...@link SerializationBase} classes.
*/
public static <K> K[] loadArray(Configuration conf, String keyName,
Class<K> itemClass) throws IOException {
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Sep
3 01:36:33 2009
@@ -33,9 +33,10 @@
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Progress;
@@ -705,6 +706,14 @@
return new TreeMap<Text, Text>(this.theMetadata);
}
+ public Map<String, String> getMetadataAsStringMap() {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Map.Entry<Text, Text> entry : theMetadata.entrySet()) {
+ map.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return map;
+ }
+
public void write(DataOutput out) throws IOException {
out.writeInt(this.theMetadata.size());
Iterator<Map.Entry<Text, Text>> iter =
@@ -801,9 +810,9 @@
Metadata metadata = null;
Compressor compressor = null;
- protected Serializer keySerializer;
- protected Serializer uncompressedValSerializer;
- protected Serializer compressedValSerializer;
+ protected SerializerBase keySerializer;
+ protected SerializerBase uncompressedValSerializer;
+ protected SerializerBase compressedValSerializer;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -914,9 +923,10 @@
this.codec = codec;
this.metadata = metadata;
SerializationFactory serializationFactory = new
SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
+ this.keySerializer = getSerializer(serializationFactory, keyClass,
metadata);
this.keySerializer.open(buffer);
- this.uncompressedValSerializer =
serializationFactory.getSerializer(valClass);
+ this.uncompressedValSerializer = getSerializer(serializationFactory,
+ valClass, metadata);
this.uncompressedValSerializer.open(buffer);
if (this.codec != null) {
ReflectionUtils.setConf(this.codec, this.conf);
@@ -924,11 +934,20 @@
this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
this.deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
- this.compressedValSerializer =
serializationFactory.getSerializer(valClass);
+ this.compressedValSerializer = getSerializer(serializationFactory,
+ valClass, metadata);
this.compressedValSerializer.open(deflateOut);
}
}
+ @SuppressWarnings("unchecked")
+ private SerializerBase getSerializer(SerializationFactory sf, Class c,
+ Metadata metadata) {
+ Map<String, String> stringMetadata = metadata.getMetadataAsStringMap();
+ stringMetadata.put(SerializationBase.CLASS_KEY, c.getName());
+ return sf.getSerializer(stringMetadata);
+ }
+
/** Returns the class of keys in this file. */
public Class getKeyClass() { return keyClass; }
@@ -1412,8 +1431,8 @@
private DataInputStream valIn = null;
private Decompressor valDecompressor = null;
- private Deserializer keyDeserializer;
- private Deserializer valDeserializer;
+ private DeserializerBase keyDeserializer;
+ private DeserializerBase valDeserializer;
/** Open the named file. */
public Reader(FileSystem fs, Path file, Configuration conf)
@@ -1563,21 +1582,24 @@
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keyDeserializer =
- getDeserializer(serializationFactory, getKeyClass());
+ getDeserializer(serializationFactory, getKeyClass(), metadata);
if (!blockCompressed) {
this.keyDeserializer.open(valBuffer);
} else {
this.keyDeserializer.open(keyIn);
}
this.valDeserializer =
- getDeserializer(serializationFactory, getValueClass());
+ getDeserializer(serializationFactory, getValueClass(), metadata);
this.valDeserializer.open(valIn);
}
}
@SuppressWarnings("unchecked")
- private Deserializer getDeserializer(SerializationFactory sf, Class c) {
- return sf.getDeserializer(c);
+ private DeserializerBase getDeserializer(SerializationFactory sf, Class c,
+ Metadata metadata) {
+ Map<String, String> stringMetadata = metadata.getMetadataAsStringMap();
+ stringMetadata.put(SerializationBase.CLASS_KEY, c.getName());
+ return sf.getDeserializer(stringMetadata);
}
/** Close the file. */
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java
Thu Sep 3 01:36:33 2009
@@ -34,6 +34,7 @@
* </p>
* @param <T>
*/
+...@deprecated
public interface Deserializer<T> {
/**
* <p>Prepare the deserializer for reading.</p>
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configured;
+
+public abstract class DeserializerBase<T> extends Configured
+ implements Closeable, Deserializer<T> {
+
+ /**
+ * <p>Prepare the deserializer for reading.</p>
+ */
+ public abstract void open(InputStream in) throws IOException;
+
+ /**
+ * <p>
+ * Deserialize the next object from the underlying input stream.
+ * If the object <code>t</code> is non-null then this deserializer
+ * <i>may</i> set its internal state to the next object read from the input
+ * stream. Otherwise, if the object <code>t</code> is null a new
+ * deserialized object will be created.
+ * </p>
+ * @return the deserialized object
+ */
+ public abstract T deserialize(T t) throws IOException;
+
+}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
Thu Sep 3 01:36:33 2009
@@ -52,6 +52,13 @@
this.deserializer.open(buffer);
}
+ protected DeserializerComparator(DeserializerBase<T> deserializer)
+ throws IOException {
+
+ this.deserializer = deserializer;
+ this.deserializer.open(buffer);
+ }
+
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+...@suppresswarnings("deprecation")
+class LegacyDeserializer<T> extends DeserializerBase<T> {
+
+ private Deserializer<T> deserializer;
+
+ public LegacyDeserializer(Deserializer<T> deserializer) {
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public void open(InputStream in) throws IOException {
+ deserializer.open(in);
+ }
+
+ @Override
+ public T deserialize(T t) throws IOException {
+ return deserializer.deserialize(t);
+ }
+
+ @Override
+ public void close() throws IOException {
+ deserializer.close();
+ }
+
+}
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * Wraps a legacy {...@link Serialization} as a {...@link SerializationBase}.
+ * </p>
+ *
+ * @param <T>
+ */
+...@suppresswarnings("deprecation")
+class LegacySerialization<T> extends SerializationBase<T> {
+
+ private Serialization<T> serialization;
+
+ public LegacySerialization(Serialization<T> serialization,
+ Configuration conf) {
+ this.serialization = serialization;
+ setConf(conf);
+ }
+
+ Serialization<T> getUnderlyingSerialization() {
+ return serialization;
+ }
+
+ @Deprecated
+ @Override
+ public boolean accept(Class<?> c) {
+ return serialization.accept(c);
+ }
+
+ @Deprecated
+ @Override
+ public Deserializer<T> getDeserializer(Class<T> c) {
+ return serialization.getDeserializer(c);
+ }
+
+ @Deprecated
+ @Override
+ public Serializer<T> getSerializer(Class<T> c) {
+ return serialization.getSerializer(c);
+ }
+
+ @Override
+ public boolean accept(Map<String, String> metadata) {
+ Class<?> c = getClassFromMetadata(metadata);
+ return accept(c);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public SerializerBase<T> getSerializer(Map<String, String> metadata) {
+ Class<T> c = (Class<T>) getClassFromMetadata(metadata);
+ return new LegacySerializer<T>(getSerializer(c));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
+ Class<T> c = (Class<T>) getClassFromMetadata(metadata);
+ return new LegacyDeserializer<T>(getDeserializer(c));
+ }
+
+}
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+...@suppresswarnings("deprecation")
+class LegacySerializer<T> extends SerializerBase<T> {
+
+ private Serializer<T> serializer;
+
+ public LegacySerializer(Serializer<T> serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void open(OutputStream out) throws IOException {
+ serializer.open(out);
+ }
+
+ @Override
+ public void serialize(T t) throws IOException {
+ serializer.serialize(t);
+ }
+
+ @Override
+ public void close() throws IOException {
+ serializer.close();
+ }
+
+ @Override
+ public Map<String, String> getMetadata() throws IOException {
+ return Collections.<String, String>emptyMap();
+ }
+
+}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java
Thu Sep 3 01:36:33 2009
@@ -24,6 +24,7 @@
* </p>
* @param <T>
*/
+...@deprecated
public interface Serialization<T> {
/**
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationBase.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationBase.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationBase.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * <p>
+ * Encapsulates a {...@link SerializerBase}/{...@link DeserializerBase} pair.
+ * </p>
+ *
+ * @param <T>
+ */
+public abstract class SerializationBase<T> extends Configured
+ implements Serialization<T> {
+
+ public static final String SERIALIZATION_KEY = "Serialization-Class";
+ public static final String CLASS_KEY = "Serialized-Class";
+
+ public static Map<String, String> getMetadataFromClass(Class<?> c) {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put(CLASS_KEY, c.getName());
+ return metadata;
+ }
+
+ @Deprecated
+ @Override
+ public boolean accept(Class<?> c) {
+ return accept(getMetadataFromClass(c));
+ }
+
+ @Deprecated
+ @Override
+ public Deserializer<T> getDeserializer(Class<T> c) {
+ return getDeserializer(getMetadataFromClass(c));
+ }
+
+ @Deprecated
+ @Override
+ public Serializer<T> getSerializer(Class<T> c) {
+ return getSerializer(getMetadataFromClass(c));
+ }
+
+ /**
+ * Allows clients to test whether this {...@link SerializationBase} supports
the
+ * given metadata.
+ */
+ public abstract boolean accept(Map<String, String> metadata);
+
+ /**
+ * @return a {...@link SerializerBase} for the given metadata.
+ */
+ public abstract SerializerBase<T> getSerializer(Map<String, String>
metadata);
+
+ /**
+ * @return a {...@link DeserializerBase} for the given metadata.
+ */
+ public abstract DeserializerBase<T> getDeserializer(
+ Map<String, String> metadata);
+
+ protected Class<?> getClassFromMetadata(Map<String, String> metadata) {
+ String classname = metadata.get(CLASS_KEY);
+ if (classname == null) {
+ return null;
+ }
+ try {
+ return getConf().getClassByName(classname);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
Thu Sep 3 01:36:33 2009
@@ -20,11 +20,13 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization;
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
import org.apache.hadoop.util.ReflectionUtils;
@@ -32,7 +34,7 @@
/**
* <p>
- * A factory for {...@link Serialization}s.
+ * A factory for {...@link SerializationBase}s.
* </p>
*/
public class SerializationFactory extends Configured {
@@ -40,7 +42,10 @@
private static final Log LOG =
LogFactory.getLog(SerializationFactory.class.getName());
- private List<Serialization<?>> serializations = new
ArrayList<Serialization<?>>();
+ private List<SerializationBase<?>> serializations =
+ new ArrayList<SerializationBase<?>>();
+ private List<SerializationBase<?>> legacySerializations =
+ new ArrayList<SerializationBase<?>>();
/**
* <p>
@@ -54,7 +59,8 @@
for (String serializerName : conf.getStrings("io.serializations",
new String[]{WritableSerialization.class.getName(),
AvroSpecificSerialization.class.getName(),
- AvroReflectSerialization.class.getName()})) {
+ AvroReflectSerialization.class.getName(),
+ AvroGenericSerialization.class.getName()})) {
add(conf, serializerName);
}
}
@@ -62,30 +68,62 @@
@SuppressWarnings("unchecked")
private void add(Configuration conf, String serializationName) {
try {
-
- Class<? extends Serialization> serializionClass =
- (Class<? extends Serialization>)
conf.getClassByName(serializationName);
- serializations.add((Serialization)
- ReflectionUtils.newInstance(serializionClass, getConf()));
+ Class<?> serializationClass = conf.getClassByName(serializationName);
+ if (SerializationBase.class.isAssignableFrom(serializationClass)) {
+ serializations.add((SerializationBase)
+ ReflectionUtils.newInstance(serializationClass, getConf()));
+ } else if (Serialization.class.isAssignableFrom(serializationClass)) {
+ Serialization serialization = (Serialization)
+ ReflectionUtils.newInstance(serializationClass, getConf());
+ legacySerializations.add(new LegacySerialization(serialization,
+ getConf()));
+ } else {
+ LOG.warn("Serialization class " + serializationName + " is not an " +
+ "instance of Serialization or BaseSerialization.");
+ }
} catch (ClassNotFoundException e) {
- LOG.warn("Serilization class not found: " +
+ LOG.warn("Serialization class not found: " +
StringUtils.stringifyException(e));
}
}
+ @Deprecated
public <T> Serializer<T> getSerializer(Class<T> c) {
return getSerialization(c).getSerializer(c);
}
+ @Deprecated
public <T> Deserializer<T> getDeserializer(Class<T> c) {
return getSerialization(c).getDeserializer(c);
}
- @SuppressWarnings("unchecked")
+ @Deprecated
public <T> Serialization<T> getSerialization(Class<T> c) {
- for (Serialization serialization : serializations) {
- if (serialization.accept(c)) {
- return (Serialization<T>) serialization;
+ return getSerialization(SerializationBase.getMetadataFromClass(c));
+ }
+
+ public <T> SerializerBase<T> getSerializer(Map<String, String> metadata) {
+ SerializationBase<T> serialization = getSerialization(metadata);
+ return serialization.getSerializer(metadata);
+ }
+
+ public <T> DeserializerBase<T> getDeserializer(Map<String, String> metadata)
{
+ SerializationBase<T> serialization = getSerialization(metadata);
+ return serialization.getDeserializer(metadata);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> SerializationBase<T> getSerialization(Map<String, String>
metadata) {
+ for (SerializationBase serialization : serializations) {
+ if (serialization.accept(metadata)) {
+ return (SerializationBase<T>) serialization;
+ }
+ }
+ // Look in the legacy serializations last, since they ignore
+ // non-class metadata
+ for (SerializationBase serialization : legacySerializations) {
+ if (serialization.accept(metadata)) {
+ return (SerializationBase<T>) serialization;
}
}
return null;
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java
Thu Sep 3 01:36:33 2009
@@ -34,6 +34,7 @@
* </p>
* @param <T>
*/
+...@deprecated
public interface Serializer<T> {
/**
* <p>Prepare the serializer for writing.</p>
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializerBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializerBase.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializerBase.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/SerializerBase.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+
+public abstract class SerializerBase<T> extends Configured
+ implements Closeable, Serializer<T> {
+
+ /**
+ * <p>Prepare the serializer for writing.</p>
+ */
+ public abstract void open(OutputStream out) throws IOException;
+
+ /**
+ * <p>Serialize <code>t</code> to the underlying output stream.</p>
+ */
+ public abstract void serialize(T t) throws IOException;
+
+ public abstract Map<String, String> getMetadata() throws IOException;
+
+}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
Thu Sep 3 01:36:33 2009
@@ -23,22 +23,20 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
/**
- * A {...@link Serialization} for {...@link Writable}s that delegates to
+ * A {...@link SerializationBase} for {...@link Writable}s that delegates to
* {...@link Writable#write(java.io.DataOutput)} and
* {...@link Writable#readFields(java.io.DataInput)}.
*/
-public class WritableSerialization extends Configured
- implements Serialization<Writable> {
+public class WritableSerialization extends SerializationBase<Writable> {
- static class WritableDeserializer extends Configured
- implements Deserializer<Writable> {
+ static class WritableDeserializer extends DeserializerBase<Writable> {
private Class<?> writableClass;
private DataInputStream dataIn;
@@ -48,6 +46,7 @@
this.writableClass = c;
}
+ @Override
public void open(InputStream in) {
if (in instanceof DataInputStream) {
dataIn = (DataInputStream) in;
@@ -56,6 +55,7 @@
}
}
+ @Override
public Writable deserialize(Writable w) throws IOException {
Writable writable;
if (w == null) {
@@ -68,16 +68,23 @@
return writable;
}
+ @Override
public void close() throws IOException {
dataIn.close();
}
}
- static class WritableSerializer implements Serializer<Writable> {
-
+ static class WritableSerializer extends SerializerBase<Writable> {
+
+ private Map<String, String> metadata;
private DataOutputStream dataOut;
+ public WritableSerializer(Map<String, String> metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
public void open(OutputStream out) {
if (out instanceof DataOutputStream) {
dataOut = (DataOutputStream) out;
@@ -86,26 +93,41 @@
}
}
+ @Override
public void serialize(Writable w) throws IOException {
w.write(dataOut);
}
+ @Override
public void close() throws IOException {
dataOut.close();
}
- }
+ @Override
+ public Map<String, String> getMetadata() throws IOException {
+ return metadata;
+ }
- public boolean accept(Class<?> c) {
- return Writable.class.isAssignableFrom(c);
}
- public Deserializer<Writable> getDeserializer(Class<Writable> c) {
- return new WritableDeserializer(getConf(), c);
+ @Override
+ public boolean accept(Map<String, String> metadata) {
+ if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
+ return true;
+ }
+ Class<?> c = getClassFromMetadata(metadata);
+ return c == null ? false : Writable.class.isAssignableFrom(c);
}
- public Serializer<Writable> getSerializer(Class<Writable> c) {
- return new WritableSerializer();
+ @Override
+ public SerializerBase<Writable> getSerializer(Map<String, String> metadata) {
+ return new WritableSerializer(metadata);
+ }
+
+ @Override
+ public DeserializerBase<Writable> getDeserializer(Map<String, String>
metadata) {
+ Class<?> c = getClassFromMetadata(metadata);
+ return new WritableDeserializer(getConf(), c);
}
}
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java?rev=810756&view=auto
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
(added)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
Thu Sep 3 01:36:33 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer.avro;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.io.serializer.SerializationBase;
+
+/**
+ * Serialization for Avro Generic classes. For a class to be accepted by this
+ * serialization it must have metadata with key
+ * {...@link SerializationBase#SERIALIZATION_KEY} set to {...@link
AvroGenericSerialization}'s
+ * fully-qualified classname.
+ * The schema used is the one set by {...@link
AvroSerialization#AVRO_SCHEMA_KEY}.
+ */
+...@suppresswarnings("unchecked")
+public class AvroGenericSerialization extends AvroSerialization<Object> {
+
+ @Override
+ public boolean accept(Map<String, String> metadata) {
+ return metadata.get(AVRO_SCHEMA_KEY) != null;
+ }
+
+ @Override
+ protected DatumReader getReader(Map<String, String> metadata) {
+ Schema schema = Schema.parse(metadata.get(AVRO_SCHEMA_KEY));
+ return new GenericDatumReader<Object>(schema);
+ }
+
+ @Override
+ protected Schema getSchema(Object t, Map<String, String> metadata) {
+ String jsonSchema = metadata.get(AVRO_SCHEMA_KEY);
+ return jsonSchema != null ? Schema.parse(jsonSchema) :
GenericData.induce(t);
+ }
+
+ @Override
+ protected DatumWriter getWriter(Map<String, String> metadata) {
+ return new GenericDatumWriter<Object>();
+ }
+
+}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
Thu Sep 3 01:36:33 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.io.serializer.avro;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
@@ -27,6 +28,7 @@
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
/**
* Serialization for Avro Reflect classes. For a class to be accepted by this
@@ -47,10 +49,18 @@
private Set<String> packages;
- public synchronized boolean accept(Class<?> c) {
+ @Override
+ public synchronized boolean accept(Map<String, String> metadata) {
if (packages == null) {
getPackages();
}
+ if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
+ return true;
+ }
+ Class<?> c = getClassFromMetadata(metadata);
+ if (c == null) {
+ return false;
+ }
return AvroReflectSerializable.class.isAssignableFrom(c) ||
packages.contains(c.getPackage().getName());
}
@@ -65,8 +75,11 @@
}
}
- protected DatumReader getReader(Class<Object> clazz) {
+ @Override
+ protected DatumReader getReader(Map<String, String> metadata) {
try {
+ Class<SpecificRecord> clazz = (Class<SpecificRecord>)
+ getClassFromMetadata(metadata);
String prefix =
((clazz.getEnclosingClass() == null
|| "null".equals(clazz.getEnclosingClass().getName())) ?
@@ -78,11 +91,13 @@
}
}
- protected Schema getSchema(Object t) {
+ @Override
+ protected Schema getSchema(Object t, Map<String, String> metadata) {
return ReflectData.getSchema(t.getClass());
}
- protected DatumWriter getWriter(Class<Object> clazz) {
+ @Override
+ protected DatumWriter getWriter(Map<String, String> metadata) {
return new ReflectDatumWriter();
}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
Thu Sep 3 01:36:33 2009
@@ -21,92 +21,105 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
/**
* Base class for providing serialization to Avro types.
*/
-public abstract class AvroSerialization<T> extends Configured
- implements Serialization<T>{
+public abstract class AvroSerialization<T> extends SerializationBase<T> {
+
+ public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
- public Deserializer<T> getDeserializer(Class<T> c) {
- return new AvroDeserializer(c);
+ public DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
+ return new AvroDeserializer(metadata);
}
- public Serializer<T> getSerializer(Class<T> c) {
- return new AvroSerializer(c);
+ public SerializerBase<T> getSerializer(Map<String, String> metadata) {
+ return new AvroSerializer(metadata);
}
/**
- * Return an Avro Schema instance for the given class.
+ * Return an Avro Schema instance for the given class and metadata.
*/
- protected abstract Schema getSchema(T t);
+ protected abstract Schema getSchema(T t, Map<String, String> metadata);
/**
- * Create and return Avro DatumWriter for the given class.
+ * Create and return Avro DatumWriter for the given metadata.
*/
- protected abstract DatumWriter<T> getWriter(Class<T> clazz);
+ protected abstract DatumWriter<T> getWriter(Map<String, String> metadata);
/**
- * Create and return Avro DatumReader for the given class.
+ * Create and return Avro DatumReader for the given metadata.
*/
- protected abstract DatumReader<T> getReader(Class<T> clazz);
+ protected abstract DatumReader<T> getReader(Map<String, String> metadata);
- class AvroSerializer implements Serializer<T> {
+ class AvroSerializer extends SerializerBase<T> {
+ private Map<String, String> metadata;
private DatumWriter<T> writer;
private BinaryEncoder encoder;
private OutputStream outStream;
- protected Class<T> clazz;
- AvroSerializer(Class<T> clazz) {
- writer = getWriter(clazz);
+ AvroSerializer(Map<String, String> metadata) {
+ this.metadata = metadata;
+ writer = getWriter(metadata);
}
+ @Override
public void close() throws IOException {
encoder.flush();
outStream.close();
}
+ @Override
public void open(OutputStream out) throws IOException {
outStream = out;
encoder = new BinaryEncoder(out);
}
+ @Override
public void serialize(T t) throws IOException {
- writer.setSchema(getSchema(t));
+ writer.setSchema(getSchema(t, metadata));
writer.write(t, encoder);
}
+ @Override
+ public Map<String, String> getMetadata() throws IOException {
+ return metadata;
+ }
+
}
- class AvroDeserializer implements Deserializer<T> {
+ class AvroDeserializer extends DeserializerBase<T> {
private DatumReader<T> reader;
private BinaryDecoder decoder;
private InputStream inStream;
- AvroDeserializer(Class<T> clazz) {
- this.reader = getReader(clazz);
+ AvroDeserializer(Map<String, String> metadata) {
+ this.reader = getReader(metadata);
}
+ @Override
public void close() throws IOException {
inStream.close();
}
+ @Override
public T deserialize(T t) throws IOException {
return reader.read(t, decoder);
}
+ @Override
public void open(InputStream in) throws IOException {
inStream = in;
decoder = new BinaryDecoder(in);
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
Thu Sep 3 01:36:33 2009
@@ -18,7 +18,10 @@
package org.apache.hadoop.io.serializer.avro;
+import java.util.Map;
+
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
@@ -33,23 +36,33 @@
public class AvroSpecificSerialization
extends AvroSerialization<SpecificRecord>{
- public boolean accept(Class<?> c) {
- return SpecificRecord.class.isAssignableFrom(c);
+ @Override
+ public boolean accept(Map<String, String> metadata) {
+ if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
+ return true;
+ }
+ Class<?> c = getClassFromMetadata(metadata);
+ return c == null ? false : SpecificRecord.class.isAssignableFrom(c);
}
- protected DatumReader getReader(Class<SpecificRecord> clazz) {
+ @Override
+ protected DatumReader getReader(Map<String, String> metadata) {
try {
+ Class<SpecificRecord> clazz = (Class<SpecificRecord>)
+ getClassFromMetadata(metadata);
return new SpecificDatumReader(clazz.newInstance().schema());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- protected Schema getSchema(SpecificRecord t) {
+ @Override
+ protected Schema getSchema(SpecificRecord t, Map<String, String> metadata) {
return t.schema();
}
- protected DatumWriter getWriter(Class<SpecificRecord> clazz) {
+ @Override
+ protected DatumWriter getWriter(Map<String, String> metadata) {
return new SpecificDatumWriter();
}
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
Thu Sep 3 01:36:33 2009
@@ -18,21 +18,27 @@
package org.apache.hadoop.util;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
-import java.io.*;
-import java.lang.management.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializerBase;
/**
* General reflection utils
@@ -269,11 +275,12 @@
buffer.outBuffer.reset();
SerializationFactory factory = getFactory(conf);
Class<T> cls = (Class<T>) src.getClass();
- Serializer<T> serializer = factory.getSerializer(cls);
+ Map<String, String> metadata = SerializationBase.getMetadataFromClass(cls);
+ SerializerBase<T> serializer = factory.getSerializer(metadata);
serializer.open(buffer.outBuffer);
serializer.serialize(src);
buffer.moveData();
- Deserializer<T> deserializer = factory.getDeserializer(cls);
+ DeserializerBase<T> deserializer = factory.getDeserializer(metadata);
deserializer.open(buffer.inBuffer);
dst = deserializer.deserialize(dst);
return dst;
Modified:
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java
(original)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java
Thu Sep 3 01:36:33 2009
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.io.serializer;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -33,19 +35,33 @@
* @return deserialized item
*/
public static<K> K testSerialization(Configuration conf, K before)
- throws Exception {
-
+ throws Exception {
+ Map<String, String> metadata =
+ SerializationBase.getMetadataFromClass(GenericsUtil.getClass(before));
+ return testSerialization(conf, metadata, before);
+ }
+
+ /**
+ * A utility that tests serialization/deserialization.
+ * @param conf configuration to use, "io.serializations" is read to
+ * determine the serialization
+ * @param metadata the metadata to pass to the serializer/deserializer
+ * @param <K> the class of the item
+ * @param before item to (de)serialize
+ * @return deserialized item
+ */
+ public static <K> K testSerialization(Configuration conf,
+ Map<String, String> metadata, K before) throws Exception {
+
SerializationFactory factory = new SerializationFactory(conf);
- Serializer<K> serializer
- = factory.getSerializer(GenericsUtil.getClass(before));
- Deserializer<K> deserializer
- = factory.getDeserializer(GenericsUtil.getClass(before));
-
+ SerializerBase<K> serializer = factory.getSerializer(metadata);
+ DeserializerBase<K> deserializer = factory.getDeserializer(metadata);
+
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(before);
serializer.close();
-
+
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
Modified:
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java?rev=810756&r1=810755&r2=810756&view=diff
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java
(original)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java
Thu Sep 3 01:36:33 2009
@@ -18,9 +18,14 @@
package org.apache.hadoop.io.serializer.avro;
+import java.util.HashMap;
+import java.util.Map;
+
import junit.framework.TestCase;
+import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.SerializationBase;
import org.apache.hadoop.io.serializer.SerializationTestUtil;
public class TestAvroSerialization extends TestCase {
@@ -59,6 +64,16 @@
SerializationTestUtil.testSerialization(conf, before);
assertEquals(before, after);
}
+
+ public void testGeneric() throws Exception {
+ Utf8 before = new Utf8("hadoop");
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put(SerializationBase.SERIALIZATION_KEY,
+ AvroGenericSerialization.class.getName());
+ metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, "\"string\"");
+ Utf8 after = SerializationTestUtil.testSerialization(conf, metadata,
before);
+ assertEquals(before, after);
+ }
public static class InnerRecord {
public int x = 7;