Repository: samza
Updated Branches:
  refs/heads/master 9f8682138 -> 924a78dab


SAMZA-1622: avro writer to support generic record

avro writer in HDFS system producer to support generic record

Author: Hai Lu <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #449 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/924a78da
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/924a78da
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/924a78da

Branch: refs/heads/master
Commit: 924a78dabc4fd55d5c50ad3a74d46e660e22d0a5
Parents: 9f86821
Author: Hai Lu <[email protected]>
Authored: Fri Mar 16 09:22:01 2018 -0700
Committer: xiliu <[email protected]>
Committed: Fri Mar 16 09:22:01 2018 -0700

----------------------------------------------------------------------
 .../system/hdfs/writer/AvroDataFileHdfsWriter.scala   | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/924a78da/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
index c3da611..f1868cd 100644
--- 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
+++ 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
@@ -20,10 +20,10 @@
 package org.apache.samza.system.hdfs.writer
 
 import org.apache.avro.file.{CodecFactory, DataFileWriter}
+import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
 import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter}
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.IOUtils
-import org.apache.hadoop.io.compress.{DefaultCodec, GzipCodec, SnappyCodec}
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.hdfs.HdfsConfig
 
@@ -67,11 +67,17 @@ class AvroDataFileHdfsWriter (dfs: FileSystem, systemName: 
String, config: HdfsC
 
   protected def getNextWriter(record: Object): Option[DataFileWriter[Object]] 
= {
     val path = bucketer.get.getNextWritePath(dfs)
-    val schema = ReflectData.get().getSchema(record.getClass)
-    val datumWriter = new ReflectDatumWriter[Object](schema)
+    val isGenericRecord = record.isInstanceOf[GenericRecord]
+    val schema = record match {
+      case genericRecord: GenericRecord => genericRecord.getSchema
+      case _ => ReflectData.get().getSchema(record.getClass)
+    }
+    val datumWriter = if (isGenericRecord)
+      new GenericDatumWriter[Object](schema)
+    else new ReflectDatumWriter[Object](schema)
     val fileWriter = new DataFileWriter[Object](datumWriter)
     val cn = config.getCompressionType(systemName)
-    if (cn != "none") fileWriter.setCodec(CodecFactory.fromString(cn))
+    if (!cn.equals("none")) fileWriter.setCodec(CodecFactory.fromString(cn))
     Some(fileWriter.create(schema, dfs.create(path)))
   }
 

Reply via email to