1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new
Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions //
make sure lines with the same key in the same partition
    }
}



2014-08-12 21:34 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>:

> 1. be careful, HDFS are better for large files, not bunches of small files.
>
> 2. if that's really what you want, roll it your own.
>
> def writeAvro(iterator: Iterator[(String, String)]) = {
>   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
> map
>   try {
>   while (iterator.hasNext) {
>     val item = iterator.next()
>     val key = item._1
>     val line = item._2
>     val writer = writers.get(key) match {
>       case Some(writer) => writer
>       case None =>
>         val path = arg(1) + key
>         val outputStream = FileSystem.get(new Configuration()).create(new 
> Path(path))
>         writer = new BufferedWriter(outputStream)
>     }
>     writer.writeLine(line)
>     } finally {
>     writers.values.foreach(._close())
>     }
> }
>
> val inputData = sc.textFile()
> val keyValue = inputData.map(line => (key, line))
> val partitions = keValue.partitionBy(new MyPartition(10))
> partitions.foreachPartition(writeLines)
>
>
> class MyPartitioner(partitions: Int) extends Partitioner {
>     override def numPartitions: Int = partitions
>
>     override def getPartition(key: Any): Int = {
>         (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make 
> sure lines with the same key in the same partition
>
>     }
> }
>
>
>
> 2014-08-11 20:42 GMT+08:00 诺铁 <noty...@gmail.com>:
>
> hi,
>>
>> I have googled and find similar question without good answer,
>> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark
>>
>> in short, I would like to separate raw data and divide by some key, for
>> example, create date, and put the in directory named by date, so that I can
>> easily access portion of data later.
>>
>>  for now I have to extract all keys and then filter by key and save to
>> file repeatly. are there any good way to do this?  or maybe I shouldn't do
>> such thing?
>>
>
>

Reply via email to