[
https://issues.apache.org/jira/browse/SPARK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen resolved SPARK-5829.
------------------------------
Resolution: Duplicate
Same as SPARK-3228 which is WontFix. The behavior is intended. You can actually
copy and change the saveAs* functions and change them to get the behavior you
want pretty easily.
> JavaStreamingContext.fileStream run task loop repeated empty when no more
> new files found
> ------------------------------------------------------------------------------------------
>
> Key: SPARK-5829
> URL: https://issues.apache.org/jira/browse/SPARK-5829
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.2.1
> Environment: spark master (1.3.0) with SPARK-5826 patch.
> Reporter: Littlestar
> Priority: Minor
>
> spark master (1.3.0) with SPARK-5826 patch.
> JavaStreamingContext.fileStream run task repeated empty when no more new files
> reproduce:
> 1. mkdir /testspark/watchdir on HDFS.
> 2. run app.
> 3. put some text files into /testspark/watchdir.
> every 30 seconds, spark log indicates that a new sub task runs.
> and /testspark/resultdir/ has new directory with empty files every 30 seconds.
> when no new files add, but it runs new task with empy rdd.
> {noformat}
> package my.test.hadoop.spark;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.api.java.function.Function2;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import scala.Tuple2;
> public class TestStream {
> @SuppressWarnings({ "serial", "resource" })
> public static void main(String[] args) throws Exception {
>
> SparkConf conf = new SparkConf().setAppName("TestStream");
> JavaStreamingContext jssc = new JavaStreamingContext(conf,
> Durations.seconds(30));
> jssc.checkpoint("/testspark/checkpointdir");
> Configuration jobConf = new Configuration();
> jobConf.set("my.test.fields","fields");
> JavaPairDStream<Integer, Integer> is =
> jssc.fileStream("/testspark/watchdir", LongWritable.class, Text.class,
> TextInputFormat.class, new Function<Path, Boolean>() {
> @Override
> public Boolean call(Path v1) throws Exception {
> return true;
> }
> }, true, jobConf).mapToPair(new PairFunction<Tuple2<LongWritable,
> Text>, Integer, Integer>() {
> @Override
> public Tuple2<Integer, Integer> call(Tuple2<LongWritable, Text>
> arg0) throws Exception {
> return new Tuple2<Integer, Integer>(1, 1);
> }
> });
> JavaPairDStream<Integer, Integer> rs = is.reduceByKey(new
> Function2<Integer, Integer, Integer>() {
> @Override
> public Integer call(Integer arg0, Integer arg1) throws
> Exception {
> return arg0 + arg1;
> }
> });
> rs.checkpoint(Durations.seconds(60));
> rs.saveAsNewAPIHadoopFiles("/testspark/resultdir/output",
> "suffix", Integer.class, Integer.class, TextOutputFormat.class);
> jssc.start();
> jssc.awaitTermination();
> }
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]