Repository: incubator-samza Updated Branches: refs/heads/master 494feb0a5 -> 95cee714e
SAMZA-138; add a file system consumer Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/95cee714 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/95cee714 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/95cee714 Branch: refs/heads/master Commit: 95cee714ef1d017f948233396ca663064df28ab4 Parents: 494feb0 Author: Yan Fang <[email protected]> Authored: Fri May 23 11:18:04 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Fri May 23 11:18:04 2014 -0700 ---------------------------------------------------------------------- .../filereader/FileReaderSystemAdmin.scala | 133 +++++++++++++++ .../filereader/FileReaderSystemConsumer.scala | 171 +++++++++++++++++++ .../filereader/FileReaderSystemFactory.scala | 53 ++++++ .../filereader/TestFileReaderSystemAdmin.scala | 101 +++++++++++ .../TestFileReaderSystemConsumer.scala | 127 ++++++++++++++ .../TestFileReaderSystemFactory.scala | 42 +++++ 6 files changed, 627 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala new file mode 100644 index 0000000..9c99a59 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.filereader + +import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import scala.collection.JavaConversions._ +import java.io.RandomAccessFile +import scala.util.control.Breaks +import org.apache.samza.Partition +import grizzled.slf4j.Logging +import org.apache.samza.SamzaException + +class FileReaderSystemAdmin extends SystemAdmin with Logging { + /** + * Given a list of streams, get their metadata. This method gets newest offset and upcoming + * offset by reading the file and then put them into SystemStreamPartitionMetadata. If the + * file is empty, it will use null for oldest and newest offset and "0" for upcoming offset. + * The metadata is a map (Partition, SystemStreamPartitionMetadata). Here, we only use one partition + * for each file. This method returns a map, whose key is stream name and whose value is the metadata. + * + * @see getNewestOffsetAndUpcomingOffset(RandomAccessFile) + */ + def getSystemStreamMetadata(streams: java.util.Set[String]) = { + val allMetadata = streams.map(stream => { + val file = new RandomAccessFile(stream, "r") + val systemStreamPartitionMetadata = file.length match { + case 0 => new SystemStreamPartitionMetadata(null, null, "0") + case _ => { + val (newestOffset, upcomingOffset) = getNewestOffsetAndUpcomingOffset(file) + new SystemStreamPartitionMetadata("0", newestOffset, upcomingOffset) + } + } + file.close + val streamPartitionMetadata = Map(new Partition(0) -> systemStreamPartitionMetadata) + val systemStreamMetadata = new SystemStreamMetadata(stream, streamPartitionMetadata) + (stream, systemStreamMetadata) + }).toMap + + info("Got metadata: %s" format allMetadata) + + allMetadata + } + + /** + * This method looks for the location of next newline in the file based on supplied offset. + * It first finds out the \n position of the line which starts with the supplied offset. The + * next newline's offset will be the \n position + 1. + * + * If we can not find the \n position in the supplied offset's line, throw a SamzaException. Because + * we are supposed to only call this method in fully consumed messages. + */ + def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { + val offsetAfter = offsets.map { + case (systemStreamPartition, offset) => { + val file = new RandomAccessFile(systemStreamPartition.getStream, "r") + val newOffset = findNextEnter(file, offset.toLong, 1) match { + case Some(x) => x + 1 + case None => throw new SamzaException("the line beginning with " + offset + " in " + systemStreamPartition.getStream + " has not been completed!") + } + (systemStreamPartition, newOffset.toString) + } + } + mapAsJavaMap(offsetAfter) + } + + /** + * Get the newest offset and upcoming offset from a file. The newest offset is the offset of + * second-to-last \n in the file + 1. The upcoming offset is the offset of last \n + 1. If + * there are not enough \n in the file, default value is 0. + * + * This method reads file backwards until reach the second-to-last \n. The assumption is, in most cases, + * there are more bytes to second-to-last \n from beginning than from ending. + */ + private def getNewestOffsetAndUpcomingOffset(file: RandomAccessFile): (String, String) = { + var newestOffset = 0 + val upcomingOffset = findNextEnter(file, file.length - 1, -1) match { + case Some(x) => x + 1 + case None => 0 + } + // if we can not find upcomingOffset, we can not find newest offset either. + if (upcomingOffset != 0) { + // upcomingOffset - 2 is the offset of the byte before the last \n + newestOffset = findNextEnter(file, upcomingOffset - 2, -1) match { + case Some(x) => x + 1 + case None => 0 + } + } + (newestOffset.toString, upcomingOffset.toString) + } + + /** + * This method is to find the next \n in the file according to the starting position provided. + * If the step is +1, it will look for the \n after this position; if the step is -1, it will + * look for the \n before this starting position. If it finds the \n, return the position of + * this \n, otherwise, it returns -1. + */ + private def findNextEnter(file: RandomAccessFile, startingPosition: Long, step: Int): Option[Int] = { + var enterPosition: Option[Int] = None + var i = startingPosition + val loop = new Breaks + loop.breakable( + while (i < file.length && i > -1) { + file.seek(i) + val cha = file.read.toChar + if (cha == '\n') { + enterPosition = Some(i.toInt) + loop.break + } + i = i + step + }) + enterPosition + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala new file mode 100644 index 0000000..c0e1bb6 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.filereader + +import org.apache.samza.system.SystemConsumer +import org.apache.samza.util.BlockingEnvelopeMap +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.SystemStreamPartition +import scala.collection.mutable.Map +import java.io.RandomAccessFile +import org.apache.samza.system.IncomingMessageEnvelope +import java.util.concurrent.LinkedBlockingQueue +import org.apache.samza.Partition +import collection.JavaConversions._ +import scala.collection.mutable.HashMap +import java.util.concurrent.Executors +import java.util.concurrent.ExecutorService +import org.apache.samza.util.DaemonThreadFactory +import org.apache.samza.SamzaException +import grizzled.slf4j.Logging + +object FileReaderSystemConsumer { + /** + * prefix for the file reader system thread names + */ + val FILE_READER_SYSTEM_THREAD_PREFIX = "filereader-" +} + +class FileReaderSystemConsumer( + systemName: String, + metricsRegistry: MetricsRegistry, + + /** + * Threshold used to determine when there are too many IncomingMessageEnvelopes to be put onto + * the BlockingQueue. + */ + queueSize: Int = 10000, + + /** + * the sleep interval of checking the file length. Unit of the time is milliseconds. + */ + pollingSleepMs: Int = 500) extends BlockingEnvelopeMap with Logging { + + /** + * a map for storing a systemStreamPartition and its starting offset. + */ + var systemStreamPartitionAndStartingOffset = Map[SystemStreamPartition, String]() + + /** + * a thread pool for the threads reading files. + * The size of the pool equals to the number of files to read. + */ + var pool: ExecutorService = null + + /** + * register the systemStreamPartition and put they SystemStreampartition and its starting offset + * into the systemStreamPartitionAndStartingOffset map + */ + override def register(systemStreamPartition: SystemStreamPartition, startingOffset: String) { + super.register(systemStreamPartition, startingOffset) + systemStreamPartitionAndStartingOffset += ((systemStreamPartition, startingOffset)) + } + + /** + * start one thread for each file reader + */ + override def start { + pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, new DaemonThreadFactory(FileReaderSystemConsumer.FILE_READER_SYSTEM_THREAD_PREFIX)) + systemStreamPartitionAndStartingOffset.map { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) } + } + + /** + * Stop all the running threads + */ + override def stop { + pool.shutdown + } + + /** + * The method returns a runnable object, which reads a file until reach the end of the file. It puts + * every line (ends with \n) and its offset (the beginning of the line) into BlockingQueue. If a line + * is not ended with \n, it is thought as uncompleted. Therefore the thread will wait until the line + * is completed and then put it into queue. The thread keeps comparing the file length with file pointer + * to read the latest/updated file content. If the file is read to the end of current content, setIsHead() + * is called to specify that the SystemStreamPartition has "caught up". The thread sleep time between + * two compares is determined by <code>pollingSleepMs</code> + */ + private def readInputFiles(ssp: SystemStreamPartition, startingOffset: String) = { + new Runnable { + @volatile var shutdown = false // tag to indicate the thread should stop running + + def run() { + val path = ssp.getStream + var file: RandomAccessFile = null + var filePointer = startingOffset.toLong + var line = "" // used to form a line of characters + var offset = filePointer // record the beginning offset of a line + try { + file = new RandomAccessFile(path, "r") + while (!shutdown) { + if (file.length <= filePointer) { + Thread.sleep(pollingSleepMs) + file.close + file = new RandomAccessFile(path, "r") + } else { + file.seek(filePointer) + var i = filePointer + while (i < file.length) { + val cha = file.read.toChar + if (cha == '\n') { + // put into the queue. offset is the beginning of this line + put(ssp, new IncomingMessageEnvelope(ssp, offset.toString, null, line)); + offset = i + 1 // the beginning of the newline + line = "" + } else { + line = line + cha + } + i += 1 + } + filePointer = file.length + setIsAtHead(ssp, true) + } + } + } catch { + case ie: InterruptedException => { + // Swallow this exception since we don't need to clutter the logs + // with interrupt exceptions when shutting down. + info("Received an interrupt while reading file. Shutting down.") + } + } finally { + if (file != null) { + file.close + } + } + } + + // stop the thread gracefully by changing the variable's value + def stop() { + shutdown = true + } + } + } + + /** + * Constructs a new bounded BlockingQueue of IncomingMessageEnvelopes. The bound is determined + * by the <code>BOUNDED_QUEUE_THRESHOLD</code> constant. + * + * @return A bounded queue used for queueing IncomingMessageEnvelopes to be sent to their + * specified destinations. + */ + override def newBlockingQueue = { + new LinkedBlockingQueue[IncomingMessageEnvelope](queueSize); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala new file mode 100644 index 0000000..9f2bb17 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.filereader + +import org.apache.samza.system.SystemFactory +import org.apache.samza.config.Config +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.SamzaException + +class FileReaderSystemFactory extends SystemFactory { + /** + * get the FileReaderSystemConsumer. It also tries to get the queue size + * and polling sleep time from config file. If they do not exist, will use the default + * value. + */ + def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { + val queueSize = config.getInt("systems." + systemName + ".queue.size", 10000) + val pollingSleepMs = config.getInt("systems." + systemName + ".polling.sleep.ms", 500) + new FileReaderSystemConsumer(systemName, registry, queueSize, pollingSleepMs) + } + + /** + * this system is not designed for writing to files. So disable the producer method. + * It throws Exception when the system tries to getProducer. + */ + def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { + throw new SamzaException("not supposed to write to files") + } + + /** + * get the FileReaderSystemAdmin + */ + def getAdmin(systemName: String, config: Config) = { + new FileReaderSystemAdmin + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala new file mode 100644 index 0000000..fb26bfc --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.filereader + +import org.junit.Assert._ +import scala.collection.JavaConversions._ +import java.io.PrintWriter +import java.io.File +import org.scalatest.junit.AssertionsForJUnit +import org.junit.Test +import org.junit.Before +import org.junit.After +import java.io.RandomAccessFile +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import scala.collection.mutable.HashMap +import org.apache.samza.SamzaException + +class TestFileReaderSystemAdmin extends AssertionsForJUnit { + + val files = List("empty.txt", "noEnter.txt", "oneEnter.txt", "twoEnter.txt", "moreEnter.txt") + + @Before + def createFiles { + files.foreach(file => { + val writer = new PrintWriter(new File(file)) + file match { + case "empty.txt" => + case "noEnter.txt" => writer.write("first line") + case "oneEnter.txt" => writer.write("first line \nsecond line") + case "twoEnter.txt" => writer.write("first line \nsecond line \nother lines") + case "moreEnter.txt" => writer.write("first line \nsecond line \nthird line \nother lines \n") + } + writer.close + }) + } + + @After + def deleteFiles { + files.foreach(file => (new File(file)).delete) + } + + @Test + def testGetOffsetsAfter { + val fileReaderSystemAdmin = new FileReaderSystemAdmin + val ssp1 = new SystemStreamPartition("file-reader", files(0), new Partition(0)) + val ssp2 = new SystemStreamPartition("file-reader", files(1), new Partition(0)) + val ssp3 = new SystemStreamPartition("file-reader", files(2), new Partition(0)) + val ssp4 = new SystemStreamPartition("file-reader", files(3), new Partition(0)) + val ssp5 = new SystemStreamPartition("file-reader", files(4), new Partition(0)) + + val offsets: java.util.Map[SystemStreamPartition, String] = + HashMap(ssp3 -> "0", ssp4 -> "12", ssp5 -> "25") + val afterOffsets = fileReaderSystemAdmin.getOffsetsAfter(offsets) + assertEquals("12", afterOffsets.get(ssp3)) + assertEquals("25", afterOffsets.get(ssp4)) + assertEquals("37", afterOffsets.get(ssp5)) + } + + @Test + def testGetSystemStreamMetadata { + val fileReaderSystemAdmin = new FileReaderSystemAdmin + val allMetadata = fileReaderSystemAdmin.getSystemStreamMetadata(setAsJavaSet(files.toSet)) + val expectedEmpty = new SystemStreamPartitionMetadata(null, null, "0") + val expectedNoEntry = new SystemStreamPartitionMetadata("0", "0", "0") + val expectedOneEntry = new SystemStreamPartitionMetadata("0", "0", "12") + val expectedTwoEntry = new SystemStreamPartitionMetadata("0", "12", "25") + val expectedMoreEntry = new SystemStreamPartitionMetadata("0", "37", "50") + + allMetadata.foreach { entry => + { + val result = (entry._2).getSystemStreamPartitionMetadata().get(new Partition(0)) + entry._1 match { + case "empty.txt" => assertEquals(expectedEmpty, result) + case "noEnter.txt" => assertEquals(expectedNoEntry, result) + case "oneEnter.txt" => assertEquals(expectedOneEntry, result) + case "twoEnter.txt" => assertEquals(expectedTwoEntry, result) + case "moreEnter.txt" => assertEquals(expectedMoreEntry, result) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala new file mode 100644 index 0000000..b2e04a7 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.filereader + +import org.junit.Test +import org.junit.Assert._ +import org.apache.samza.system.SystemStreamPartition +import org.junit.AfterClass +import java.io.PrintWriter +import java.io.File +import org.apache.samza.Partition +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap +import org.junit.BeforeClass +import java.io.FileWriter + +object TestFileReaderSystemConsumer { + val consumer = new FileReaderSystemConsumer("file-reader", null) + val files = List("empty.txt", "noEnter.txt", "oneEnter.txt", "twoEnter.txt", "moreEnter.txt") + val ssp1 = new SystemStreamPartition("file-reader", files(0), new Partition(0)) + val ssp2 = new SystemStreamPartition("file-reader", files(1), new Partition(0)) + val ssp3 = new SystemStreamPartition("file-reader", files(2), new Partition(0)) + val ssp4 = new SystemStreamPartition("file-reader", files(3), new Partition(0)) + val ssp5 = new SystemStreamPartition("file-reader", files(4), new Partition(0)) + + @BeforeClass + def beforeCreateFiles { + files.foreach(file => { + val writer = new PrintWriter(new File(file)) + file match { + case "empty.txt" => + case "noEnter.txt" => writer.write("first line") + case "oneEnter.txt" => writer.write("first line \nsecond line") + case "twoEnter.txt" => writer.write("first line \nsecond line \nother lines") + case "moreEnter.txt" => writer.write("first line \nsecond line \nthird line \nother lines \n") + } + writer.close + }) + } + + @AfterClass + def afterDeleteFiles { + files.foreach(file => (new File(file)).delete) + } + + def appendFile { + val fileWriter = new FileWriter("moreEnter.txt", true); + fileWriter.write("This is a new line\n"); + fileWriter.close + } +} + +class TestFileReaderSystemConsumer { + import TestFileReaderSystemConsumer._ + + @Test + def testRegisterAndPutCorrectMessagesOffsetsToBlockingQueue { + consumer.register(ssp1, "0") + consumer.register(ssp2, "0") + consumer.register(ssp3, "0") + consumer.register(ssp4, "12") + consumer.register(ssp5, "25") + + // test register correctly + assertEquals("0", consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp1, null)) + assertEquals("0", consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp2, null)) + assertEquals("0", consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp3, null)) + assertEquals("12", consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp4, null)) + assertEquals("25", consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp5, null)) + + consumer.start + Thread.sleep(500) + + val number: Integer = 1000 + val ssp1Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp1 -> number) + val ssp2Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp2 -> number) + val ssp3Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp3 -> number) + val ssp4Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp4 -> number) + val ssp5Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp5 -> number) + + val ssp1Result = consumer.poll(ssp1Number, 1000) + val ssp2Result = consumer.poll(ssp2Number, 1000) + val ssp3Result = consumer.poll(ssp3Number, 1000) + val ssp4Result = consumer.poll(ssp4Number, 1000) + + assertEquals(0, ssp1Result.size) + assertEquals(0, ssp2Result.size) + + assertEquals(1, ssp3Result.size) + assertEquals("first line ", ssp3Result(0).getMessage) + assertEquals("0", ssp3Result(0).getOffset) + + assertEquals(1, ssp4Result.size) + assertEquals("second line ", ssp4Result(0).getMessage) + assertEquals("12", ssp4Result(0).getOffset) + + appendFile + Thread.sleep(1000) + + // ssp5 should read the new lines + val ssp5Result = consumer.poll(ssp5Number, 1000) + assertEquals(3, ssp5Result.size) + assertEquals("This is a new line", ssp5Result(2).getMessage) + assertEquals("50", ssp5Result(2).getOffset) + assertEquals("other lines ", ssp5Result(1).getMessage) + assertEquals("37", ssp5Result(1).getOffset) + + consumer.stop + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala new file mode 100644 index 0000000..330df78 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.filereader + +import org.junit.Assert._ +import scala.collection.JavaConversions._ +import org.scalatest.junit.AssertionsForJUnit +import org.junit.Test +import org.apache.samza.SamzaException + +class TestFileReaderSystemFactory extends AssertionsForJUnit { + + @Test + def testGetProducerThrowCorrectException { + val fileReaderSystemFactory = new FileReaderSystemFactory + var correctException = false + try { + fileReaderSystemFactory.getProducer("", null, null) + } catch { + case e: SamzaException => correctException = true + case _: Throwable => correctException = false + } + assertTrue(correctException) + } +}
