[SPARK-7056] [STREAMING] Make the Write Ahead Log pluggable Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan.
https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing Things to add. * Unit tests for WriteAheadLogUtils Author: Tathagata Das <[email protected]> Closes #5645 from tdas/wal-pluggable and squashes the following commits: 2c431fd [Tathagata Das] Minor fixes. c2bc7384 [Tathagata Das] More changes based on PR comments. 569a416 [Tathagata Das] fixed long line bde26b1 [Tathagata Das] Renamed segment to record handle everywhere b65e155 [Tathagata Das] More changes based on PR comments. d7cd15b [Tathagata Das] Fixed test 1a32a4b [Tathagata Das] Fixed test e0d19fb [Tathagata Das] Fixed defaults 9310cbf [Tathagata Das] style fix. 86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it. 84ce469 [Tathagata Das] Added unit test and fixed compilation error. bce5e75 [Tathagata Das] Fixed long lines. 837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 754fbf8 [Tathagata Das] Added license and docs. 09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1868bd40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1868bd40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1868bd40 Branch: refs/heads/master Commit: 1868bd40dcce23990b98748b0239bd00452b1ca5 Parents: c0c0ba6 Author: Tathagata Das <[email protected]> Authored: Wed Apr 29 13:06:11 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Wed Apr 29 13:06:11 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/kafka/KafkaUtils.scala | 3 +- .../spark/streaming/util/WriteAheadLog.java | 60 +++++ .../util/WriteAheadLogRecordHandle.java | 30 +++ .../dstream/ReceiverInputDStream.scala | 2 +- .../rdd/WriteAheadLogBackedBlockRDD.scala | 79 ++++-- .../receiver/ReceivedBlockHandler.scala | 38 ++- .../receiver/ReceiverSupervisorImpl.scala | 5 +- .../scheduler/ReceivedBlockTracker.scala | 38 ++- .../streaming/scheduler/ReceiverTracker.scala | 3 +- .../streaming/util/FileBasedWriteAheadLog.scala | 249 +++++++++++++++++++ .../FileBasedWriteAheadLogRandomReader.scala | 54 ++++ .../util/FileBasedWriteAheadLogReader.scala | 82 ++++++ .../util/FileBasedWriteAheadLogSegment.scala | 21 ++ .../util/FileBasedWriteAheadLogWriter.scala | 81 ++++++ .../util/WriteAheadLogFileSegment.scala | 20 -- .../streaming/util/WriteAheadLogManager.scala | 233 ----------------- .../util/WriteAheadLogRandomReader.scala | 54 ---- .../streaming/util/WriteAheadLogReader.scala | 82 ------ .../streaming/util/WriteAheadLogUtils.scala | 129 ++++++++++ .../streaming/util/WriteAheadLogWriter.scala | 82 ------ .../spark/streaming/JavaWriteAheadLogSuite.java | 129 ++++++++++ .../streaming/ReceivedBlockHandlerSuite.scala | 18 +- .../streaming/ReceivedBlockTrackerSuite.scala | 28 ++- .../apache/spark/streaming/ReceiverSuite.scala | 2 +- .../spark/streaming/StreamingContextSuite.scala | 4 +- .../rdd/WriteAheadLogBackedBlockRDDSuite.scala | 31 +-- .../streaming/util/WriteAheadLogSuite.scala | 194 ++++++++++----- 27 files changed, 1115 insertions(+), 636 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 0721dda..d7cf500 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -31,6 +31,7 @@ import kafka.message.MessageAndMetadata import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD @@ -80,7 +81,7 @@ object KafkaUtils { topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { - val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false) + val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java new file mode 100644 index 0000000..8c0fdfa --- /dev/null +++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +/** + * This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming + * to save the received data (by receivers) and associated metadata to a reliable storage, so that + * they can be recovered after driver failures. See the Spark documentation for more information + * on how to plug in your own custom implementation of a write ahead log. + */ [email protected] +public abstract class WriteAheadLog { + /** + * Write the record to the log and return a record handle, which contains all the information + * necessary to read back the written record. The time is used to the index the record, + * such that it can be cleaned later. Note that implementations of this abstract class must + * ensure that the written data is durable and readable (using the record handle) by the + * time this function returns. + */ + abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time); + + /** + * Read a written record based on the given record handle. + */ + abstract public ByteBuffer read(WriteAheadLogRecordHandle handle); + + /** + * Read and return an iterator of all the records that have been written but not yet cleaned up. + */ + abstract public Iterator<ByteBuffer> readAll(); + + /** + * Clean all the records that are older than the threshold time. It can wait for + * the completion of the deletion. + */ + abstract public void clean(long threshTime, boolean waitForCompletion); + + /** + * Close this log and release any resources. + */ + abstract public void close(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java new file mode 100644 index 0000000..0232418 --- /dev/null +++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util; + +/** + * This abstract class represents a handle that refers to a record written in a + * {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}. + * It must contain all the information necessary for the record to be read and returned by + * an implemenation of the WriteAheadLog class. + * + * @see org.apache.spark.streaming.util.WriteAheadLog + */ [email protected] +public abstract class WriteAheadLogRecordHandle implements java.io.Serializable { +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 8be0431..4c7fd2c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -82,7 +82,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont // WriteAheadLogBackedBlockRDD else create simple BlockRDD. if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) { val logSegments = blockStoreResults.map { - _.asInstanceOf[WriteAheadLogBasedStoreResult].segment + _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle }.toArray // Since storeInBlockManager = false, the storage level does not matter. new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 93caa4b..ebdf418 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -16,14 +16,17 @@ */ package org.apache.spark.streaming.rdd +import java.nio.ByteBuffer + import scala.reflect.ClassTag +import scala.util.control.NonFatal -import org.apache.hadoop.conf.Configuration +import org.apache.commons.io.FileUtils import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} -import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader} +import org.apache.spark.streaming.util._ /** * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. @@ -31,26 +34,27 @@ import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, Wri * the segment of the write ahead log that backs the partition. * @param index index of the partition * @param blockId id of the block having the partition data - * @param segment segment of the write ahead log having the partition data + * @param walRecordHandle Handle of the record in a write ahead log having the partition data */ private[streaming] class WriteAheadLogBackedBlockRDDPartition( val index: Int, val blockId: BlockId, - val segment: WriteAheadLogFileSegment) + val walRecordHandle: WriteAheadLogRecordHandle) extends Partition /** * This class represents a special case of the BlockRDD where the data blocks in - * the block manager are also backed by segments in write ahead logs. For reading + * the block manager are also backed by data in write ahead logs. For reading * the data, this RDD first looks up the blocks by their ids in the block manager. - * If it does not find them, it looks up the corresponding file segment. + * If it does not find them, it looks up the corresponding data in the write ahead log. * * @param sc SparkContext * @param blockIds Ids of the blocks that contains this RDD's data - * @param segments Segments in write ahead logs that contain this RDD's data - * @param storeInBlockManager Whether to store in the block manager after reading from the segment + * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data + * @param storeInBlockManager Whether to store in the block manager after reading + * from the WAL record * @param storageLevel storage level to store when storing in block manager * (applicable when storeInBlockManager = true) */ @@ -58,15 +62,15 @@ private[streaming] class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient blockIds: Array[BlockId], - @transient segments: Array[WriteAheadLogFileSegment], + @transient walRecordHandles: Array[WriteAheadLogRecordHandle], storeInBlockManager: Boolean, storageLevel: StorageLevel) extends BlockRDD[T](sc, blockIds) { require( - blockIds.length == segments.length, + blockIds.length == walRecordHandles.length, s"Number of block ids (${blockIds.length}) must be " + - s"the same as number of segments (${segments.length}})!") + s"the same as number of WAL record handles (${walRecordHandles.length}})!") // Hadoop configuration is not serializable, so broadcast it as a serializable. @transient private val hadoopConfig = sc.hadoopConfiguration @@ -75,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { assertValid() Array.tabulate(blockIds.size) { i => - new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) + new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i)) } } /** * Gets the partition data by getting the corresponding block from the block manager. - * If the block does not exist, then the data is read from the corresponding segment + * If the block does not exist, then the data is read from the corresponding record * in write ahead log files. */ override def compute(split: Partition, context: TaskContext): Iterator[T] = { @@ -96,10 +100,35 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Read partition data of $this from block manager, block $blockId") iterator case None => // Data not found in Block Manager, grab it from write ahead log file - val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) - val dataRead = reader.read(partition.segment) - reader.close() - logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}") + var dataRead: ByteBuffer = null + var writeAheadLog: WriteAheadLog = null + try { + // The WriteAheadLogUtils.createLog*** method needs a directory to create a + // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for + // writing log data. However, the directory is not needed if data needs to be read, hence + // a dummy path is provided to satisfy the method parameter requirements. + // FileBasedWriteAheadLog will not create any file or directory at that path. + val dummyDirectory = FileUtils.getTempDirectoryPath() + writeAheadLog = WriteAheadLogUtils.createLogForReceiver( + SparkEnv.get.conf, dummyDirectory, hadoopConf) + dataRead = writeAheadLog.read(partition.walRecordHandle) + } catch { + case NonFatal(e) => + throw new SparkException( + s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) + } finally { + if (writeAheadLog != null) { + writeAheadLog.close() + writeAheadLog = null + } + } + if (dataRead == null) { + throw new SparkException( + s"Could not read data from write ahead log record ${partition.walRecordHandle}, " + + s"read returned null") + } + logInfo(s"Read partition data of $this from write ahead log, record handle " + + partition.walRecordHandle) if (storeInBlockManager) { blockManager.putBytes(blockId, dataRead, storageLevel) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") @@ -111,14 +140,20 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( /** * Get the preferred location of the partition. This returns the locations of the block - * if it is present in the block manager, else it returns the location of the - * corresponding segment in HDFS. + * if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used, + * it returns the location of the corresponding file segment in HDFS . */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockLocations = getBlockIdLocations().get(partition.blockId) - blockLocations.getOrElse( - HdfsUtils.getFileSegmentLocations( - partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)) + blockLocations.getOrElse { + partition.walRecordHandle match { + case fileSegment: FileBasedWriteAheadLogSegment => + HdfsUtils.getFileSegmentLocations( + fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig) + case _ => + Seq.empty + } + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 297bf04..4b3d9ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -17,18 +17,18 @@ package org.apache.spark.streaming.receiver -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.{existentials, postfixOps} -import WriteAheadLogBasedBlockHandler._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage._ -import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager} -import org.apache.spark.util.{ThreadUtils, Clock, SystemClock} +import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ +import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { @@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler( */ private[streaming] case class WriteAheadLogBasedStoreResult( blockId: StreamBlockId, - segment: WriteAheadLogFileSegment + walRecordHandle: WriteAheadLogRecordHandle ) extends ReceivedBlockStoreResult @@ -116,10 +116,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler( private val blockStoreTimeout = conf.getInt( "spark.streaming.receiver.blockStoreTimeout", 30).seconds - private val rollingInterval = conf.getInt( - "spark.streaming.receiver.writeAheadLog.rollingInterval", 60) - private val maxFailures = conf.getInt( - "spark.streaming.receiver.writeAheadLog.maxFailures", 3) private val effectiveStorageLevel = { if (storageLevel.deserialized) { @@ -139,13 +135,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler( s"$effectiveStorageLevel when write ahead log is enabled") } - // Manages rolling log files - private val logManager = new WriteAheadLogManager( - checkpointDirToLogDir(checkpointDir, streamId), - hadoopConf, rollingInterval, maxFailures, - callerName = this.getClass.getSimpleName, - clock = clock - ) + // Write ahead log manages + private val writeAheadLog = WriteAheadLogUtils.createLogForReceiver( + conf, checkpointDirToLogDir(checkpointDir, streamId), hadoopConf) // For processing futures used in parallel block storing into block manager and write ahead log // # threads = 2, so that both writing to BM and WAL can proceed in parallel @@ -183,21 +175,21 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in write ahead log val storeInWriteAheadLogFuture = Future { - logManager.writeToLog(serializedBlock) + writeAheadLog.write(serializedBlock, clock.getTimeMillis()) } - // Combine the futures, wait for both to complete, and return the write ahead log segment + // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) - val segment = Await.result(combinedFuture, blockStoreTimeout) - WriteAheadLogBasedStoreResult(blockId, segment) + val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) + WriteAheadLogBasedStoreResult(blockId, walRecordHandle) } def cleanupOldBlocks(threshTime: Long) { - logManager.cleanupOldLogs(threshTime, waitForCompletion = false) + writeAheadLog.clean(threshTime, false) } def stop() { - logManager.stop() + writeAheadLog.close() } } http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index f237936..93f047b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -25,12 +25,13 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.Time import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark.util.{RpcUtils, Utils} +import org.apache.spark.{Logging, SparkEnv, SparkException} /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] @@ -46,7 +47,7 @@ private[streaming] class ReceiverSupervisorImpl( ) extends ReceiverSupervisor(receiver, env.conf) with Logging { private val receivedBlockHandler: ReceivedBlockHandler = { - if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 200cf4e..14e769a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -25,10 +25,10 @@ import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkException, Logging, SparkConf} import org.apache.spark.streaming.Time -import org.apache.spark.streaming.util.WriteAheadLogManager +import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils} import org.apache.spark.util.{Clock, Utils} +import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait representing any event in the ReceivedBlockTracker that updates its state. */ private[streaming] sealed trait ReceivedBlockTrackerLogEvent @@ -70,7 +70,7 @@ private[streaming] class ReceivedBlockTracker( private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue] private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks] - private val logManagerOption = createLogManager() + private val writeAheadLogOption = createWriteAheadLog() private var lastAllocatedBatchTime: Time = null @@ -155,12 +155,12 @@ private[streaming] class ReceivedBlockTracker( logInfo("Deleting batches " + timesToCleanup) writeToLog(BatchCleanupEvent(timesToCleanup)) timeToAllocatedBlocks --= timesToCleanup - logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion)) + writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) } /** Stop the block tracker. */ def stop() { - logManagerOption.foreach { _.stop() } + writeAheadLogOption.foreach { _.close() } } /** @@ -190,9 +190,10 @@ private[streaming] class ReceivedBlockTracker( timeToAllocatedBlocks --= batchTimes } - logManagerOption.foreach { logManager => + writeAheadLogOption.foreach { writeAheadLog => logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}") - logManager.readFromLog().foreach { byteBuffer => + import scala.collection.JavaConversions._ + writeAheadLog.readAll().foreach { byteBuffer => logTrace("Recovering record " + byteBuffer) Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match { case BlockAdditionEvent(receivedBlockInfo) => @@ -208,10 +209,10 @@ private[streaming] class ReceivedBlockTracker( /** Write an update to the tracker to the write ahead log */ private def writeToLog(record: ReceivedBlockTrackerLogEvent) { - if (isLogManagerEnabled) { + if (isWriteAheadLogEnabled) { logDebug(s"Writing to log $record") - logManagerOption.foreach { logManager => - logManager.writeToLog(ByteBuffer.wrap(Utils.serialize(record))) + writeAheadLogOption.foreach { logManager => + logManager.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis()) } } } @@ -222,8 +223,8 @@ private[streaming] class ReceivedBlockTracker( } /** Optionally create the write ahead log manager only if the feature is enabled */ - private def createLogManager(): Option[WriteAheadLogManager] = { - if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + private def createWriteAheadLog(): Option[WriteAheadLog] = { + if (WriteAheadLogUtils.enableReceiverLog(conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + @@ -231,19 +232,16 @@ private[streaming] class ReceivedBlockTracker( "See documentation for more details.") } val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get) - val rollingIntervalSecs = conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) - val logManager = new WriteAheadLogManager(logDir, hadoopConf, - rollingIntervalSecs = rollingIntervalSecs, clock = clock, - callerName = "ReceivedBlockHandlerMaster") - Some(logManager) + + val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf) + Some(log) } else { None } } - /** Check if the log manager is enabled. This is only used for testing purposes. */ - private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty + /** Check if the write ahead log is enabled. This is only used for testing purposes. */ + private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty } private[streaming] object ReceivedBlockTracker { http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index c4ead6f..1af6571 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.{HashMap, SynchronizedMap} import scala.language.existentials +import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException} import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} @@ -125,7 +126,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) // Signal the receivers to delete old block data - if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logInfo(s"Cleanup old received batch data: $cleanupThreshTime") receiverInfo.values.flatMap { info => Option(info.endpoint) } .foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) } http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala new file mode 100644 index 0000000..9985fed --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.{Iterator => JIterator} + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.language.postfixOps + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.util.ThreadUtils +import org.apache.spark.{Logging, SparkConf} + +/** + * This class manages write ahead log files. + * - Writes records (bytebuffers) to periodically rotating log files. + * - Recovers the log files and the reads the recovered records upon failures. + * - Cleans up old log files. + * + * Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write + * and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read. + * + * @param logDirectory Directory when rotating log files will be created. + * @param hadoopConf Hadoop configuration for reading/writing log files. + */ +private[streaming] class FileBasedWriteAheadLog( + conf: SparkConf, + logDirectory: String, + hadoopConf: Configuration, + rollingIntervalSecs: Int, + maxFailures: Int + ) extends WriteAheadLog with Logging { + + import FileBasedWriteAheadLog._ + + private val pastLogs = new ArrayBuffer[LogInfo] + private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("") + + private val threadpoolName = s"WriteAheadLogManager $callerNameTag" + implicit private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName)) + override protected val logName = s"WriteAheadLogManager $callerNameTag" + + private var currentLogPath: Option[String] = None + private var currentLogWriter: FileBasedWriteAheadLogWriter = null + private var currentLogWriterStartTime: Long = -1L + private var currentLogWriterStopTime: Long = -1L + + initializeOrRecover() + + /** + * Write a byte buffer to the log file. This method synchronously writes the data in the + * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed + * to HDFS, and will be available for readers to read. + */ + def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized { + var fileSegment: FileBasedWriteAheadLogSegment = null + var failures = 0 + var lastException: Exception = null + var succeeded = false + while (!succeeded && failures < maxFailures) { + try { + fileSegment = getLogWriter(time).write(byteBuffer) + succeeded = true + } catch { + case ex: Exception => + lastException = ex + logWarning("Failed to write to write ahead log") + resetWriter() + failures += 1 + } + } + if (fileSegment == null) { + logError(s"Failed to write to write ahead log after $failures failures") + throw lastException + } + fileSegment + } + + def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { + val fileSegment = segment.asInstanceOf[FileBasedWriteAheadLogSegment] + var reader: FileBasedWriteAheadLogRandomReader = null + var byteBuffer: ByteBuffer = null + try { + reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) + byteBuffer = reader.read(fileSegment) + } finally { + reader.close() + } + byteBuffer + } + + /** + * Read all the existing logs from the log directory. + * + * Note that this is typically called when the caller is initializing and wants + * to recover past state from the write ahead logs (that is, before making any writes). + * If this is called after writes have been made using this manager, then it may not return + * the latest the records. This does not deal with currently active log files, and + * hence the implementation is kept simple. + */ + def readAll(): JIterator[ByteBuffer] = synchronized { + import scala.collection.JavaConversions._ + val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath + logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) + + logFilesToRead.iterator.map { file => + logDebug(s"Creating log reader with $file") + new FileBasedWriteAheadLogReader(file, hadoopConf) + } flatMap { x => x } + } + + /** + * Delete the log files that are older than the threshold time. + * + * Its important to note that the threshold time is based on the time stamps used in the log + * files, which is usually based on the local system time. So if there is coordination necessary + * between the node calculating the threshTime (say, driver node), and the local system time + * (say, worker node), the caller has to take account of possible time skew. + * + * If waitForCompletion is set to true, this method will return only after old logs have been + * deleted. This should be set to true only for testing. Else the files will be deleted + * asynchronously. + */ + def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { + val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } + logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") + + def deleteFiles() { + oldLogFiles.foreach { logInfo => + try { + val path = new Path(logInfo.path) + val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf) + fs.delete(path, true) + synchronized { pastLogs -= logInfo } + logDebug(s"Cleared log file $logInfo") + } catch { + case ex: Exception => + logWarning(s"Error clearing write ahead log file $logInfo", ex) + } + } + logInfo(s"Cleared log files in $logDirectory older than $threshTime") + } + if (!executionContext.isShutdown) { + val f = Future { deleteFiles() } + if (waitForCompletion) { + import scala.concurrent.duration._ + Await.ready(f, 1 second) + } + } + } + + + /** Stop the manager, close any open log writer */ + def close(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + } + executionContext.shutdown() + logInfo("Stopped write ahead log manager") + } + + /** Get the current log writer while taking care of rotation */ + private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized { + if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { + resetWriter() + currentLogPath.foreach { + pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _) + } + currentLogWriterStartTime = currentTime + currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) + val newLogPath = new Path(logDirectory, + timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) + currentLogPath = Some(newLogPath.toString) + currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf) + } + currentLogWriter + } + + /** Initialize the log directory or recover existing logs inside the directory */ + private def initializeOrRecover(): Unit = synchronized { + val logDirectoryPath = new Path(logDirectory) + val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) + + if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) + pastLogs.clear() + pastLogs ++= logFileInfo + logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") + logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") + } + } + + private def resetWriter(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + currentLogWriter = null + } + } +} + +private[streaming] object FileBasedWriteAheadLog { + + case class LogInfo(startTime: Long, endTime: Long, path: String) + + val logFileRegex = """log-(\d+)-(\d+)""".r + + def timeToLogFile(startTime: Long, stopTime: Long): String = { + s"log-$startTime-$stopTime" + } + + def getCallerName(): Option[String] = { + val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName) + stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption) + } + + /** Convert a sequence of files to a sequence of sorted LogInfo objects */ + def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { + files.flatMap { file => + logFileRegex.findFirstIn(file.getName()) match { + case Some(logFileRegex(startTimeStr, stopTimeStr)) => + val startTime = startTimeStr.toLong + val stopTime = stopTimeStr.toLong + Some(LogInfo(startTime, stopTime, file.toString)) + case None => + None + } + }.sortBy { _.startTime } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala new file mode 100644 index 0000000..f716822 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.util + +import java.io.Closeable +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration + +/** + * A random access reader for reading write ahead log files written using + * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info, + * this reads the record (ByteBuffer) from the log file. + */ +private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration) + extends Closeable { + + private val instream = HdfsUtils.getInputStream(path, conf) + private var closed = false + + def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized { + assertOpen() + instream.seek(segment.offset) + val nextLength = instream.readInt() + HdfsUtils.checkState(nextLength == segment.length, + s"Expected message length to be ${segment.length}, but was $nextLength") + val buffer = new Array[Byte](nextLength) + instream.readFully(buffer) + ByteBuffer.wrap(buffer) + } + + override def close(): Unit = synchronized { + closed = true + instream.close() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala new file mode 100644 index 0000000..c3bb59f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.util + +import java.io.{Closeable, EOFException} +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.Logging + +/** + * A reader for reading write ahead log files written using + * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. This reads + * the records (bytebuffers) in the log file sequentially and return them as an + * iterator of bytebuffers. + */ +private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Configuration) + extends Iterator[ByteBuffer] with Closeable with Logging { + + private val instream = HdfsUtils.getInputStream(path, conf) + private var closed = false + private var nextItem: Option[ByteBuffer] = None + + override def hasNext: Boolean = synchronized { + if (closed) { + return false + } + + if (nextItem.isDefined) { // handle the case where hasNext is called without calling next + true + } else { + try { + val length = instream.readInt() + val buffer = new Array[Byte](length) + instream.readFully(buffer) + nextItem = Some(ByteBuffer.wrap(buffer)) + logTrace("Read next item " + nextItem.get) + true + } catch { + case e: EOFException => + logDebug("Error reading next item, EOF reached", e) + close() + false + case e: Exception => + logWarning("Error while trying to read data from HDFS.", e) + close() + throw e + } + } + } + + override def next(): ByteBuffer = synchronized { + val data = nextItem.getOrElse { + close() + throw new IllegalStateException( + "next called without calling hasNext or after hasNext returned false") + } + nextItem = None // Ensure the next hasNext call loads new data. + data + } + + override def close(): Unit = synchronized { + if (!closed) { + instream.close() + } + closed = true + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala new file mode 100644 index 0000000..2e1f152 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.util + +/** Class for representing a segment of data in a write ahead log file */ +private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int) + extends WriteAheadLogRecordHandle http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala new file mode 100644 index 0000000..e146bec --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.util + +import java.io._ +import java.nio.ByteBuffer + +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FSDataOutputStream + +/** + * A writer for writing byte-buffers to a write ahead log file. + */ +private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf: Configuration) + extends Closeable { + + private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) + + private lazy val hadoopFlushMethod = { + // Use reflection to get the right flush operation + val cls = classOf[FSDataOutputStream] + Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption + } + + private var nextOffset = stream.getPos() + private var closed = false + + /** Write the bytebuffer to the log file */ + def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized { + assertOpen() + data.rewind() // Rewind to ensure all data in the buffer is retrieved + val lengthToWrite = data.remaining() + val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite) + stream.writeInt(lengthToWrite) + if (data.hasArray) { + stream.write(data.array()) + } else { + // If the buffer is not backed by an array, we transfer using temp array + // Note that despite the extra array copy, this should be faster than byte-by-byte copy + while (data.hasRemaining) { + val array = new Array[Byte](data.remaining) + data.get(array) + stream.write(array) + } + } + flush() + nextOffset = stream.getPos() + segment + } + + override def close(): Unit = synchronized { + closed = true + stream.close() + } + + private def flush() { + hadoopFlushMethod.foreach { _.invoke(stream) } + // Useful for local file system where hflush/sync does not work (HADOOP-7844) + stream.getWrappedStream.flush() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala deleted file mode 100644 index 1005a2c..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.util - -/** Class for representing a segment of data in a write ahead log file */ -private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int) http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala deleted file mode 100644 index 38a93cc..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.util - -import java.nio.ByteBuffer - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.language.postfixOps - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.spark.Logging -import org.apache.spark.util.{ThreadUtils, Clock, SystemClock} -import WriteAheadLogManager._ - -/** - * This class manages write ahead log files. - * - Writes records (bytebuffers) to periodically rotating log files. - * - Recovers the log files and the reads the recovered records upon failures. - * - Cleans up old log files. - * - * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write - * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read. - * - * @param logDirectory Directory when rotating log files will be created. - * @param hadoopConf Hadoop configuration for reading/writing log files. - * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over. - * Default is one minute. - * @param maxFailures Max number of failures that is tolerated for every attempt to write to log. - * Default is three. - * @param callerName Optional name of the class who is using this manager. - * @param clock Optional clock that is used to check for rotation interval. - */ -private[streaming] class WriteAheadLogManager( - logDirectory: String, - hadoopConf: Configuration, - rollingIntervalSecs: Int = 60, - maxFailures: Int = 3, - callerName: String = "", - clock: Clock = new SystemClock - ) extends Logging { - - private val pastLogs = new ArrayBuffer[LogInfo] - private val callerNameTag = - if (callerName.nonEmpty) s" for $callerName" else "" - private val threadpoolName = s"WriteAheadLogManager $callerNameTag" - implicit private val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName)) - override protected val logName = s"WriteAheadLogManager $callerNameTag" - - private var currentLogPath: Option[String] = None - private var currentLogWriter: WriteAheadLogWriter = null - private var currentLogWriterStartTime: Long = -1L - private var currentLogWriterStopTime: Long = -1L - - initializeOrRecover() - - /** - * Write a byte buffer to the log file. This method synchronously writes the data in the - * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed - * to HDFS, and will be available for readers to read. - */ - def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized { - var fileSegment: WriteAheadLogFileSegment = null - var failures = 0 - var lastException: Exception = null - var succeeded = false - while (!succeeded && failures < maxFailures) { - try { - fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer) - succeeded = true - } catch { - case ex: Exception => - lastException = ex - logWarning("Failed to write to write ahead log") - resetWriter() - failures += 1 - } - } - if (fileSegment == null) { - logError(s"Failed to write to write ahead log after $failures failures") - throw lastException - } - fileSegment - } - - /** - * Read all the existing logs from the log directory. - * - * Note that this is typically called when the caller is initializing and wants - * to recover past state from the write ahead logs (that is, before making any writes). - * If this is called after writes have been made using this manager, then it may not return - * the latest the records. This does not deal with currently active log files, and - * hence the implementation is kept simple. - */ - def readFromLog(): Iterator[ByteBuffer] = synchronized { - val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath - logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) - logFilesToRead.iterator.map { file => - logDebug(s"Creating log reader with $file") - new WriteAheadLogReader(file, hadoopConf) - } flatMap { x => x } - } - - /** - * Delete the log files that are older than the threshold time. - * - * Its important to note that the threshold time is based on the time stamps used in the log - * files, which is usually based on the local system time. So if there is coordination necessary - * between the node calculating the threshTime (say, driver node), and the local system time - * (say, worker node), the caller has to take account of possible time skew. - * - * If waitForCompletion is set to true, this method will return only after old logs have been - * deleted. This should be set to true only for testing. Else the files will be deleted - * asynchronously. - */ - def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = { - val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } - logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + - s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") - - def deleteFiles() { - oldLogFiles.foreach { logInfo => - try { - val path = new Path(logInfo.path) - val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf) - fs.delete(path, true) - synchronized { pastLogs -= logInfo } - logDebug(s"Cleared log file $logInfo") - } catch { - case ex: Exception => - logWarning(s"Error clearing write ahead log file $logInfo", ex) - } - } - logInfo(s"Cleared log files in $logDirectory older than $threshTime") - } - if (!executionContext.isShutdown) { - val f = Future { deleteFiles() } - if (waitForCompletion) { - import scala.concurrent.duration._ - Await.ready(f, 1 second) - } - } - } - - - /** Stop the manager, close any open log writer */ - def stop(): Unit = synchronized { - if (currentLogWriter != null) { - currentLogWriter.close() - } - executionContext.shutdown() - logInfo("Stopped write ahead log manager") - } - - /** Get the current log writer while taking care of rotation */ - private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized { - if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { - resetWriter() - currentLogPath.foreach { - pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _) - } - currentLogWriterStartTime = currentTime - currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) - val newLogPath = new Path(logDirectory, - timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) - currentLogPath = Some(newLogPath.toString) - currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf) - } - currentLogWriter - } - - /** Initialize the log directory or recover existing logs inside the directory */ - private def initializeOrRecover(): Unit = synchronized { - val logDirectoryPath = new Path(logDirectory) - val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - - if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) - pastLogs.clear() - pastLogs ++= logFileInfo - logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") - logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") - } - } - - private def resetWriter(): Unit = synchronized { - if (currentLogWriter != null) { - currentLogWriter.close() - currentLogWriter = null - } - } -} - -private[util] object WriteAheadLogManager { - - case class LogInfo(startTime: Long, endTime: Long, path: String) - - val logFileRegex = """log-(\d+)-(\d+)""".r - - def timeToLogFile(startTime: Long, stopTime: Long): String = { - s"log-$startTime-$stopTime" - } - - /** Convert a sequence of files to a sequence of sorted LogInfo objects */ - def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { - files.flatMap { file => - logFileRegex.findFirstIn(file.getName()) match { - case Some(logFileRegex(startTimeStr, stopTimeStr)) => - val startTime = startTimeStr.toLong - val stopTime = stopTimeStr.toLong - Some(LogInfo(startTime, stopTime, file.toString)) - case None => - None - } - }.sortBy { _.startTime } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala deleted file mode 100644 index 0039890..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.util - -import java.io.Closeable -import java.nio.ByteBuffer - -import org.apache.hadoop.conf.Configuration - -/** - * A random access reader for reading write ahead log files written using - * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info, - * this reads the record (bytebuffer) from the log file. - */ -private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) - extends Closeable { - - private val instream = HdfsUtils.getInputStream(path, conf) - private var closed = false - - def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized { - assertOpen() - instream.seek(segment.offset) - val nextLength = instream.readInt() - HdfsUtils.checkState(nextLength == segment.length, - s"Expected message length to be ${segment.length}, but was $nextLength") - val buffer = new Array[Byte](nextLength) - instream.readFully(buffer) - ByteBuffer.wrap(buffer) - } - - override def close(): Unit = synchronized { - closed = true - instream.close() - } - - private def assertOpen() { - HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala deleted file mode 100644 index 2afc0d1..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.util - -import java.io.{Closeable, EOFException} -import java.nio.ByteBuffer - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging - -/** - * A reader for reading write ahead log files written using - * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads - * the records (bytebuffers) in the log file sequentially and return them as an - * iterator of bytebuffers. - */ -private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) - extends Iterator[ByteBuffer] with Closeable with Logging { - - private val instream = HdfsUtils.getInputStream(path, conf) - private var closed = false - private var nextItem: Option[ByteBuffer] = None - - override def hasNext: Boolean = synchronized { - if (closed) { - return false - } - - if (nextItem.isDefined) { // handle the case where hasNext is called without calling next - true - } else { - try { - val length = instream.readInt() - val buffer = new Array[Byte](length) - instream.readFully(buffer) - nextItem = Some(ByteBuffer.wrap(buffer)) - logTrace("Read next item " + nextItem.get) - true - } catch { - case e: EOFException => - logDebug("Error reading next item, EOF reached", e) - close() - false - case e: Exception => - logWarning("Error while trying to read data from HDFS.", e) - close() - throw e - } - } - } - - override def next(): ByteBuffer = synchronized { - val data = nextItem.getOrElse { - close() - throw new IllegalStateException( - "next called without calling hasNext or after hasNext returned false") - } - nextItem = None // Ensure the next hasNext call loads new data. - data - } - - override def close(): Unit = synchronized { - if (!closed) { - instream.close() - } - closed = true - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala new file mode 100644 index 0000000..7f6ff12 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkConf, SparkException} + +/** A helper class with utility functions related to the WriteAheadLog interface */ +private[streaming] object WriteAheadLogUtils extends Logging { + val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable" + val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class" + val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = + "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs" + val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures" + + val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class" + val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY = + "spark.streaming.driver.writeAheadLog.rollingIntervalSecs" + val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures" + + val DEFAULT_ROLLING_INTERVAL_SECS = 60 + val DEFAULT_MAX_FAILURES = 3 + + def enableReceiverLog(conf: SparkConf): Boolean = { + conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false) + } + + def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = { + if (isDriver) { + conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS) + } else { + conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS) + } + } + + def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = { + if (isDriver) { + conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES) + } else { + conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES) + } + } + + /** + * Create a WriteAheadLog for the driver. If configured with custom WAL class, it will try + * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog. + */ + def createLogForDriver( + sparkConf: SparkConf, + fileWalLogDirectory: String, + fileWalHadoopConf: Configuration + ): WriteAheadLog = { + createLog(true, sparkConf, fileWalLogDirectory, fileWalHadoopConf) + } + + /** + * Create a WriteAheadLog for the receiver. If configured with custom WAL class, it will try + * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog. + */ + def createLogForReceiver( + sparkConf: SparkConf, + fileWalLogDirectory: String, + fileWalHadoopConf: Configuration + ): WriteAheadLog = { + createLog(false, sparkConf, fileWalLogDirectory, fileWalHadoopConf) + } + + /** + * Create a WriteAheadLog based on the value of the given config key. The config key is used + * to get the class name from the SparkConf. If the class is configured, it will try to + * create instance of that class by first trying `new CustomWAL(sparkConf, logDir)` then trying + * `new CustomWAL(sparkConf)`. If either fails, it will fail. If no class is configured, then + * it will create the default FileBasedWriteAheadLog. + */ + private def createLog( + isDriver: Boolean, + sparkConf: SparkConf, + fileWalLogDirectory: String, + fileWalHadoopConf: Configuration + ): WriteAheadLog = { + + val classNameOption = if (isDriver) { + sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY) + } else { + sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY) + } + classNameOption.map { className => + try { + instantiateClass( + Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf) + } catch { + case NonFatal(e) => + throw new SparkException(s"Could not create a write ahead log of class $className", e) + } + }.getOrElse { + new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf, + getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver)) + } + } + + /** Instantiate the class, either using single arg constructor or zero arg constructor */ + private def instantiateClass(cls: Class[_ <: WriteAheadLog], conf: SparkConf): WriteAheadLog = { + try { + cls.getConstructor(classOf[SparkConf]).newInstance(conf) + } catch { + case nsme: NoSuchMethodException => + cls.getConstructor().newInstance() + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala deleted file mode 100644 index 679f6a6..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.util - -import java.io._ -import java.net.URI -import java.nio.ByteBuffer - -import scala.util.Try - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} - -/** - * A writer for writing byte-buffers to a write ahead log file. - */ -private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration) - extends Closeable { - - private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) - - private lazy val hadoopFlushMethod = { - // Use reflection to get the right flush operation - val cls = classOf[FSDataOutputStream] - Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption - } - - private var nextOffset = stream.getPos() - private var closed = false - - /** Write the bytebuffer to the log file */ - def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized { - assertOpen() - data.rewind() // Rewind to ensure all data in the buffer is retrieved - val lengthToWrite = data.remaining() - val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite) - stream.writeInt(lengthToWrite) - if (data.hasArray) { - stream.write(data.array()) - } else { - // If the buffer is not backed by an array, we transfer using temp array - // Note that despite the extra array copy, this should be faster than byte-by-byte copy - while (data.hasRemaining) { - val array = new Array[Byte](data.remaining) - data.get(array) - stream.write(array) - } - } - flush() - nextOffset = stream.getPos() - segment - } - - override def close(): Unit = synchronized { - closed = true - stream.close() - } - - private def flush() { - hadoopFlushMethod.foreach { _.invoke(stream) } - // Useful for local file system where hflush/sync does not work (HADOOP-7844) - stream.getWrappedStream.flush() - } - - private def assertOpen() { - HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java new file mode 100644 index 0000000..50e8f9f --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import java.util.ArrayList; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.Transformer; +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.util.WriteAheadLog; +import org.apache.spark.streaming.util.WriteAheadLogRecordHandle; +import org.apache.spark.streaming.util.WriteAheadLogUtils; + +import org.junit.Test; +import org.junit.Assert; + +class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle { + int index = -1; + public JavaWriteAheadLogSuiteHandle(int idx) { + index = idx; + } +} + +public class JavaWriteAheadLogSuite extends WriteAheadLog { + + class Record { + long time; + int index; + ByteBuffer buffer; + + public Record(long tym, int idx, ByteBuffer buf) { + index = idx; + time = tym; + buffer = buf; + } + } + private int index = -1; + private ArrayList<Record> records = new ArrayList<Record>(); + + + // Methods for WriteAheadLog + @Override + public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) { + index += 1; + records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record)); + return new JavaWriteAheadLogSuiteHandle(index); + } + + @Override + public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) { + if (handle instanceof JavaWriteAheadLogSuiteHandle) { + int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index; + for (Record record: records) { + if (record.index == reqdIndex) { + return record.buffer; + } + } + } + return null; + } + + @Override + public java.util.Iterator<java.nio.ByteBuffer> readAll() { + Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() { + @Override + public Object transform(Object input) { + return ((Record) input).buffer; + } + }); + return buffers.iterator(); + } + + @Override + public void clean(long threshTime, boolean waitForCompletion) { + for (int i = 0; i < records.size(); i++) { + if (records.get(i).time < threshTime) { + records.remove(i); + i--; + } + } + } + + @Override + public void close() { + records.clear(); + } + + @Test + public void testCustomWAL() { + SparkConf conf = new SparkConf(); + conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); + WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); + + String data1 = "data1"; + WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234); + Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); + Assert.assertTrue(new String(wal.read(handle).array()).equals(data1)); + + wal.write(ByteBuffer.wrap("data2".getBytes()), 1235); + wal.write(ByteBuffer.wrap("data3".getBytes()), 1236); + wal.write(ByteBuffer.wrap("data4".getBytes()), 1237); + wal.clean(1236, false); + + java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll(); + ArrayList<String> readData = new ArrayList<String>(); + while (dataIterator.hasNext()) { + readData.add(new String(dataIterator.next().array())); + } + Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4"))); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index c090eae..2380423 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -43,7 +43,7 @@ import WriteAheadLogSuite._ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1") + val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") val hadoopConf = new Configuration() val storageLevel = StorageLevel.MEMORY_ONLY_SER val streamId = 1 @@ -130,10 +130,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche "Unexpected store result type" ) // Verify the data in write ahead log files is correct - val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment} - val loggedData = fileSegments.flatMap { segment => - val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf) - val bytes = reader.read(segment) + val walSegments = storeResults.map { result => + result.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle + } + val loggedData = walSegments.flatMap { walSegment => + val fileSegment = walSegment.asInstanceOf[FileBasedWriteAheadLogSegment] + val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) + val bytes = reader.read(fileSegment) reader.close() blockManager.dataDeserialize(generateBlockId(), bytes).toList } @@ -148,13 +151,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche } } - test("WriteAheadLogBasedBlockHandler - cleanup old blocks") { + test("WriteAheadLogBasedBlockHandler - clean old blocks") { withWriteAheadLogBasedBlockHandler { handler => val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) } storeBlocks(handler, blocks) val preCleanupLogFiles = getWriteAheadLogFiles() - preCleanupLogFiles.size should be > 1 + require(preCleanupLogFiles.size > 1) // this depends on the number of blocks inserted using generateAndStoreData() manualClock.getTimeMillis() shouldEqual 5000L @@ -218,6 +221,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */ private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) { + require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1) val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) try { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
