[ 
https://issues.apache.org/jira/browse/SPARK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-5829.
------------------------------
    Resolution: Duplicate

Same as SPARK-3228 which is WontFix. The behavior is intended. You can actually 
copy and change the saveAs* functions and change them to get the behavior you 
want pretty easily.

> JavaStreamingContext.fileStream run task loop repeated  empty when no more 
> new files found
> ------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5829
>                 URL: https://issues.apache.org/jira/browse/SPARK-5829
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.1
>         Environment: spark master (1.3.0) with SPARK-5826 patch.
>            Reporter: Littlestar
>            Priority: Minor
>
> spark master (1.3.0) with SPARK-5826 patch.
> JavaStreamingContext.fileStream run task repeated empty when no more new files
> reproduce:
>   1. mkdir /testspark/watchdir on HDFS.
>   2. run app.
>   3. put some text files into /testspark/watchdir.
> every 30 seconds, spark log indicates that a new sub task runs.
> and /testspark/resultdir/ has new directory with empty files every 30 seconds.
> when no new files add, but it runs new task with empy rdd.
> {noformat}
> package my.test.hadoop.spark;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.api.java.function.Function2;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import scala.Tuple2;
> public class TestStream {
>       @SuppressWarnings({ "serial", "resource" })
>       public static void main(String[] args) throws Exception {
>           
>               SparkConf conf = new SparkConf().setAppName("TestStream");
>               JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(30));
>               jssc.checkpoint("/testspark/checkpointdir");
>               Configuration jobConf = new Configuration();
>               jobConf.set("my.test.fields","fields");
>         JavaPairDStream<Integer, Integer> is = 
> jssc.fileStream("/testspark/watchdir", LongWritable.class, Text.class, 
> TextInputFormat.class, new Function<Path, Boolean>() {
>             @Override
>             public Boolean call(Path v1) throws Exception {
>                 return true;
>             }
>         }, true, jobConf).mapToPair(new PairFunction<Tuple2<LongWritable, 
> Text>, Integer, Integer>() {
>             @Override
>             public Tuple2<Integer, Integer> call(Tuple2<LongWritable, Text> 
> arg0) throws Exception {
>                 return new Tuple2<Integer, Integer>(1, 1);
>             }
>         });
>               JavaPairDStream<Integer, Integer> rs = is.reduceByKey(new 
> Function2<Integer, Integer, Integer>() {
>                       @Override
>                       public Integer call(Integer arg0, Integer arg1) throws 
> Exception {
>                               return arg0 + arg1;
>                       }
>               });
>               rs.checkpoint(Durations.seconds(60));
>               rs.saveAsNewAPIHadoopFiles("/testspark/resultdir/output", 
> "suffix", Integer.class, Integer.class, TextOutputFormat.class);
>               jssc.start();
>               jssc.awaitTermination();
>       }
> }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to