Repository: samza Updated Branches: refs/heads/master 84e5aad38 -> 9abcc8eec
SAMZA-876: Add AvroDataFileHdfsWriter Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9abcc8ee Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9abcc8ee Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9abcc8ee Branch: refs/heads/master Commit: 9abcc8eeceb54e1924e5da5dd2ff4f22a6a2ae43 Parents: 84e5aad Author: Edi Bice <[email protected]> Authored: Thu Mar 3 23:00:27 2016 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Mar 3 23:00:27 2016 -0800 ---------------------------------------------------------------------- .../documentation/versioned/hdfs/producer.md | 14 ++- .../versioned/jobs/configuration-table.html | 7 +- .../apache/samza/system/hdfs/HdfsConfig.scala | 18 +++- .../hdfs/writer/AvroDataFileHdfsWriter.scala | 78 +++++++++++++++++ .../samza-hdfs-test-batch-job-avro.properties | 19 +++++ .../samza-hdfs-test-job-avro.properties | 18 ++++ .../hdfs/TestHdfsSystemProducerTestSuite.scala | 90 +++++++++++++++++++- 7 files changed, 234 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/docs/learn/documentation/versioned/hdfs/producer.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/hdfs/producer.md b/docs/learn/documentation/versioned/hdfs/producer.md index cfd22c6..0865a35 100644 --- a/docs/learn/documentation/versioned/hdfs/producer.md +++ b/docs/learn/documentation/versioned/hdfs/producer.md @@ -21,7 +21,8 @@ title: Isolation ### Writing to HDFS from Samza -The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and two `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. The other writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values. +The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and three `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. Another writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values. +The last one writes out Avro data files including the schema automatically reflected from the POJO objects fed to it. ### Configuring an HdfsSystemProducer @@ -33,6 +34,7 @@ You might configure the system producer for use by your `StreamTasks` like this: systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory # define a serializer/deserializer for the hdfs-clickstream system +# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema systems.hdfs-clickstream.samza.msg.serde=some-serde-impl # consumer configs not needed for HDFS system, reader is not implemented yet @@ -42,8 +44,10 @@ systems.hdfs-clickstream.streams.metrics.samza.msg.serde=some-metrics-impl # Assign the implementation class for this system's HdfsWriter systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter +#systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter -# Set HDFS SequenceFile compression type. Only BLOCK compression is supported currently +# Set compression type supported by chosen Writer. Only BLOCK compression is supported currently +# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none (null, anything other than the first three) systems.hdfs-clickstream.producer.hdfs.compression.type=snappy # The base dir for HDFS output. The default Bucketer for SequenceFile HdfsWriters @@ -56,9 +60,11 @@ systems.hdfs-clickstream.producer.hdfs.bucketer.class=org.apache.samza.system.hd # Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run. systems.hdfs-clickstream.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd -# Optionally set the max output bytes per file. A new file will be cut and output -# continued on the next write call each time this many bytes are written. +# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file. +# A new file will be cut and output continued on the next write call each time this many bytes +# (records for AvroDataFileHdfsWriter) are written. systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728 +#systems.hdfs-clickstream.producer.hdfs.write.batch.size.records=10000 ``` The above configuration assumes a Metrics and Serde implemnetation has been properly configured against the `some-serde-impl` and `some-metrics-impl` labels somewhere else in the same `job.properties` file. Each of these properties has a reasonable default, so you can leave out the ones you don't need to customize for your job run. http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 175437c..2745a22 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1525,10 +1525,15 @@ <td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td> </tr> <tr> - <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.write.batch.size.bytes</td> + <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.hdfs.write.batch.size.bytes</td> <td class="default">268435456</td> <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td> </tr> + <tr> + <td class="property" id="hdfs-write-batch-size-records">systems.*.producer.hdfs.write.batch.size.records</td> + <td class="default">262144</td> + <td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td> + </tr> <tr> <th colspan="3" class="section" id="task-migration"> http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala index 7993119..61b7570 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala @@ -43,8 +43,12 @@ object HdfsConfig { val BASE_OUTPUT_DIR_DEFAULT = "/user/%s/%s" // how much data to write before splitting off a new partfile - val WRITE_BATCH_SIZE = "systems.%s.producer.hdfs.write.batch.size.bytes" - val WRITE_BATCH_SIZE_DEFAULT = (1024L * 1024L * 256L).toString + val WRITE_BATCH_SIZE_BYTES = "systems.%s.producer.hdfs.write.batch.size.bytes" + val WRITE_BATCH_SIZE_BYTES_DEFAULT = (1024L * 1024L * 256L).toString + + // how much data to write before splitting off a new partfile + val WRITE_BATCH_SIZE_RECORDS = "systems.%s.producer.hdfs.write.batch.size.records" + val WRITE_BATCH_SIZE_RECORDS_DEFAULT = (256L * 1024L).toString // human-readable compression type name to be interpreted/handled by the HdfsWriter impl val COMPRESSION_TYPE = "systems.%s.producer.hdfs.compression.type" @@ -107,7 +111,15 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) { * MapReduce utilization for Hadoop jobs that will process the data later. */ def getWriteBatchSizeBytes(systemName: String): Long = { - getOrElse(HdfsConfig.WRITE_BATCH_SIZE format systemName, HdfsConfig.WRITE_BATCH_SIZE_DEFAULT).toLong + getOrElse(HdfsConfig.WRITE_BATCH_SIZE_BYTES format systemName, HdfsConfig.WRITE_BATCH_SIZE_BYTES_DEFAULT).toLong + } + + /** + * Split output files from all writer tasks based on # of bytes written to optimize + * MapReduce utilization for Hadoop jobs that will process the data later. + */ + def getWriteBatchSizeRecords(systemName: String): Long = { + getOrElse(HdfsConfig.WRITE_BATCH_SIZE_RECORDS format systemName, HdfsConfig.WRITE_BATCH_SIZE_RECORDS_DEFAULT).toLong } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala new file mode 100644 index 0000000..c3da611 --- /dev/null +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.hdfs.writer + +import org.apache.avro.file.{CodecFactory, DataFileWriter} +import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.io.IOUtils +import org.apache.hadoop.io.compress.{DefaultCodec, GzipCodec, SnappyCodec} +import org.apache.samza.system.OutgoingMessageEnvelope +import org.apache.samza.system.hdfs.HdfsConfig + +/** + * Implentation of HdfsWriter for Avro data files. Stores in a file a sequence of data conforming to a schema. + * The schema is stored in the file with the data. Each datum in a file is of the same schema. + * Data is grouped into blocks. A synchronization marker is written between blocks, so that files may be split. + * Blocks may be compressed. + */ +class AvroDataFileHdfsWriter (dfs: FileSystem, systemName: String, config: HdfsConfig) + extends HdfsWriter[DataFileWriter[Object]](dfs, systemName, config) { + + val batchSize = config.getWriteBatchSizeRecords(systemName) + val bucketer = Some(Bucketer.getInstance(systemName, config)) + var recordsWritten = 0L + + override def flush: Unit = writer.map { _.flush } + + override def write(outgoing: OutgoingMessageEnvelope): Unit = { + val record = outgoing.getMessage + if (shouldStartNewOutputFile) { + close + writer = getNextWriter(record) + } + + writer.map { seq => + seq.append(record) + recordsWritten += 1 + } + } + + override def close: Unit = { + writer.map { w => w.flush ; IOUtils.closeStream(w) } + writer = None + recordsWritten = 0L + } + + protected def shouldStartNewOutputFile: Boolean = { + recordsWritten >= batchSize || bucketer.get.shouldChangeBucket + } + + protected def getNextWriter(record: Object): Option[DataFileWriter[Object]] = { + val path = bucketer.get.getNextWritePath(dfs) + val schema = ReflectData.get().getSchema(record.getClass) + val datumWriter = new ReflectDatumWriter[Object](schema) + val fileWriter = new DataFileWriter[Object](datumWriter) + val cn = config.getCompressionType(systemName) + if (cn != "none") fileWriter.setCodec(CodecFactory.fromString(cn)) + Some(fileWriter.create(schema, dfs.create(path))) + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties new file mode 100644 index 0000000..8a0927e --- /dev/null +++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +systems.samza-hdfs-test-batch-job-avro.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter +systems.samza-hdfs-test-batch-job-avro.producer.hdfs.write.batch.size.records=10 http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties new file mode 100644 index 0000000..f01148a --- /dev/null +++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +systems.samza-hdfs-test-job-avro.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala index c4b04a1..261310d 100644 --- a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala +++ b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala @@ -20,13 +20,16 @@ package org.apache.samza.system.hdfs -import java.io.{File, IOException} +import java.io.{InputStreamReader, File, IOException} import java.net.URI import java.text.SimpleDateFormat import java.util.Date +import org.apache.avro.file.{SeekableFileInput, CodecFactory, DataFileWriter, DataFileReader} +import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter, ReflectData} +import org.apache.avro.specific.SpecificDatumReader import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.fs._ import org.apache.hadoop.hdfs.{DFSConfigKeys,MiniDFSCluster} import org.apache.hadoop.io.{SequenceFile, BytesWritable, LongWritable, Text} import org.apache.hadoop.io.SequenceFile.Reader @@ -52,7 +55,9 @@ object TestHdfsSystemProducerTestSuite { val JOB_NAME = "samza-hdfs-test-job" // write some data as BytesWritable val BATCH_JOB_NAME = "samza-hdfs-test-batch-job" // write enough binary data to force the producer to split partfiles val TEXT_JOB_NAME = "samza-hdfs-test-job-text" // write some data as String + val AVRO_JOB_NAME = "samza-hdfs-test-job-avro" // write some data as Avro val TEXT_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-text" // force a file split, understanding that Text does some compressing + val AVRO_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-avro" // force a file split, understanding that Avro does some compressing val RESOURCE_PATH_FORMAT = "file://%s/src/test/resources/%s.properties" val TEST_DATE = (new SimpleDateFormat("yyyy_MM_dd-HH")).format(new Date) @@ -60,6 +65,10 @@ object TestHdfsSystemProducerTestSuite { val EXPECTED = Array[String]("small_data", "medium_data", "large_data") val LUMP = new scala.util.Random().nextString(BATCH_SIZE) + case class AvroTestClass(a1: Long, b2: String) { + def this() = this(0L, "") + } + val hdfsFactory = new TestHdfsSystemFactory() val propsFactory = new PropertiesConfigFactory() val cluster = getMiniCluster @@ -256,6 +265,83 @@ class TestHdfsSystemProducerTestSuite extends Logging { } } + @Test + def testHdfsSystemProducerAvroWrite { + var producer: Option[HdfsSystemProducer] = None + + try { + producer = buildProducer(AVRO_JOB_NAME, cluster.get) + producer.get.register(TEST) + producer.get.start + + Thread.sleep(PAUSE) + + val systemStream = new SystemStream(AVRO_JOB_NAME, TEST) + val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja") + producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc)) + + producer.get.stop + producer = None + + val dfs = cluster.get.getFileSystem + val results = dfs.listStatus(testWritePath(AVRO_JOB_NAME)) + val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen } + assertTrue(results.length == 1) + assertTrue(bytesWritten > 0L) + + val atf = new AvroFSInput(FileContext.getFileContext(), results.head.getPath) + val schema = ReflectData.get().getSchema(atc.getClass) + val datumReader = new ReflectDatumReader[Object](schema) + val tfReader = DataFileReader.openReader(atf, datumReader) + val atc2 = tfReader.next().asInstanceOf[AvroTestClass] + + assertTrue(atc == atc2) + + } finally { + producer.map { _.stop } + } + } + + @Test + def testHdfsSystemProducerWriteAvroBatches { + var producer: Option[HdfsSystemProducer] = None + + try { + producer = buildProducer(AVRO_BATCH_JOB_NAME, cluster.get) + + producer.get.start + producer.get.register(TEST) + Thread.sleep(PAUSE) + + val systemStream = new SystemStream(AVRO_BATCH_JOB_NAME, TEST) + val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja") + + (1 to 20).map { + i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc)) + } + + producer.get.stop + producer = None + + val dfs = cluster.get.getFileSystem + val results = dfs.listStatus(testWritePath(AVRO_BATCH_JOB_NAME)) + // systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.records=10 + assertEquals(2, results.length) + + results.foreach { r => + val atf = new AvroFSInput(FileContext.getFileContext(), r.getPath) + val schema = ReflectData.get().getSchema(atc.getClass) + val datumReader = new ReflectDatumReader[Object](schema) + val tfReader = DataFileReader.openReader(atf, datumReader) + val atc2 = tfReader.next().asInstanceOf[AvroTestClass] + assertTrue(atc == atc2) + } + + } finally { + producer.map { _.stop } + } + } + }
