Repository: samza Updated Branches: refs/heads/master 9f8682138 -> 924a78dab
SAMZA-1622: avro writer to support generic record avro writer in HDFS system producer to support generic record Author: Hai Lu <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #449 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/924a78da Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/924a78da Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/924a78da Branch: refs/heads/master Commit: 924a78dabc4fd55d5c50ad3a74d46e660e22d0a5 Parents: 9f86821 Author: Hai Lu <[email protected]> Authored: Fri Mar 16 09:22:01 2018 -0700 Committer: xiliu <[email protected]> Committed: Fri Mar 16 09:22:01 2018 -0700 ---------------------------------------------------------------------- .../system/hdfs/writer/AvroDataFileHdfsWriter.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/924a78da/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala index c3da611..f1868cd 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala @@ -20,10 +20,10 @@ package org.apache.samza.system.hdfs.writer import org.apache.avro.file.{CodecFactory, DataFileWriter} +import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.IOUtils -import org.apache.hadoop.io.compress.{DefaultCodec, GzipCodec, SnappyCodec} import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.hdfs.HdfsConfig @@ -67,11 +67,17 @@ class AvroDataFileHdfsWriter (dfs: FileSystem, systemName: String, config: HdfsC protected def getNextWriter(record: Object): Option[DataFileWriter[Object]] = { val path = bucketer.get.getNextWritePath(dfs) - val schema = ReflectData.get().getSchema(record.getClass) - val datumWriter = new ReflectDatumWriter[Object](schema) + val isGenericRecord = record.isInstanceOf[GenericRecord] + val schema = record match { + case genericRecord: GenericRecord => genericRecord.getSchema + case _ => ReflectData.get().getSchema(record.getClass) + } + val datumWriter = if (isGenericRecord) + new GenericDatumWriter[Object](schema) + else new ReflectDatumWriter[Object](schema) val fileWriter = new DataFileWriter[Object](datumWriter) val cn = config.getCompressionType(systemName) - if (cn != "none") fileWriter.setCodec(CodecFactory.fromString(cn)) + if (!cn.equals("none")) fileWriter.setCodec(CodecFactory.fromString(cn)) Some(fileWriter.create(schema, dfs.create(path))) }
