[ 
https://issues.apache.org/jira/browse/HADOOP-16021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732467#comment-16732467
 ] 

Steve Loughran commented on HADOOP-16021:
-----------------------------------------

inline stack of NPE
{code}
18/12/28 11:43:31 ERROR Executor: Exception in task 0.0 in stage 23.0 (TID 23)
java.lang.NullPointerException
        at 
org.apache.hadoop.io.SequenceFile$Writer.<init>(SequenceFile.java:1119)
        at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
        at com.xxx.algo.hadoop.Writer1.call(Writer1.java:68)
        at com.xxx.algo.hadoop.Writer1.call(Writer1.java:34)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

And
{code}

2018-11-01 20:55:20 ERROR Executor:91 - Exception in task 0.0 in stage 11.0 
(TID 11)
java.io.IOException: Not supported
        at 
org.apache.hadoop.fs.ChecksumFileSystem.append(ChecksumFileSystem.java:357)
        at 
org.apache.hadoop.io.SequenceFile$Writer.<init>(SequenceFile.java:1131)
        at 
org.apache.hadoop.io.SequenceFile$BlockCompressWriter.<init>(SequenceFile.java:1511)
        at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:277)
        at com.xxx.algo.hadoop.Writer1.call(Writer1.java:68)
        at com.xxx.algo.hadoop.Writer1.call(Writer1.java:34)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}


> SequenceFile.createWriter appendIfExists codec cause NullPointerException
> -------------------------------------------------------------------------
>
>                 Key: HADOOP-16021
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16021
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>    Affects Versions: 2.7.3
>         Environment: windows10 or Linux-centos , hadoop2.7.3, jdk8
>            Reporter: asin
>            Priority: Major
>              Labels: bug
>         Attachments: 055.png, 62.png, CompressionType.BLOCK-Not 
> supported-error log.txt, CompressionType.NONE-NullPointerException-error 
> log.txt
>
>
>  
>  I want append the data in a file , when i use SequenceFile.appendIfExists , 
> it throw NullPointerException at at 
> org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
> when i remove the 'appendIfExists', it works, but it will cover old file.
>  
> when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not 
> support" exception
>  
> {code:java}
> // my code
> SequenceFile.Writer writer = null; 
> writer = SequenceFile.createWriter(conf, 
>     SequenceFile.Writer.file(path), 
>     SequenceFile.Writer.keyClass(Text.class), 
>     SequenceFile.Writer.valueClass(Text.class), 
>     SequenceFile.Writer.appendIfExists(true) );
> {code}
>  
> {code:java}
> // all my code
> public class Writer1 implements VoidFunction<Iterator<Tuple2<String, 
> String>>> {
>     private static Configuration conf = new Configuration();
>     private int MAX_LINE = 3; // little num,for test
>     @Override
>     public void call(Iterator<Tuple2<String, String>> iterator) throws 
> Exception {
>         int partitionId = TaskContext.get().partitionId();
>         int count = 0;
>         SequenceFile.Writer writer = null;
>         while (iterator.hasNext()) {
>             Tuple2<String, String> tp = iterator.next();
>             Path path = new Path("D:/tmp-doc/logs/logs.txt");
>             if (writer == null)
>                 writer = SequenceFile.createWriter(conf, 
> SequenceFile.Writer.file(path),
>                         SequenceFile.Writer.keyClass(Text.class),
>                         SequenceFile.Writer.valueClass(Text.class),
>                         SequenceFile.Writer.appendIfExists(true)
>                         );
>             writer.append(new Text(tp._1), new Text(tp._2));
>             count++;
>             if (count > MAX_LINE) {
>                 IOUtils.closeStream(writer);
>                 count = 0;
>                 writer = SequenceFile.createWriter(... // same as above
>             }
>         }
>         if (count > 0) {
>             IOUtils.closeStream(writer);
>         }
>         IOUtils.closeStream(writer);
>     }
> }
> {code}
>  // above code call by below
> {code:java}
> import com.xxx.algo.hadoop.Writer1
> import com.xxx.algo.utils.Utils
> import kafka.serializer.StringDecoder
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.apache.spark.streaming.{Durations, StreamingContext}
> import org.apache.spark.{SparkConf, SparkContext}
> object KafkaSparkStreamingApp {
>   def main(args: Array[String]): Unit = {
>     val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
>     val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
>     val topics = Set("test.aries.collection.appevent.biz")
>     val tag = "biz"
>     val durationSeconds = 5000
>     val conf = new SparkConf()
>     conf.setAppName("user-log-consumer")
>       .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrationRequired", "true")
>       .set("spark.defalut.parallelism","2")
>       .set("spark.rdd.compress","true")
>       .setMaster("local[2]")
>     val sc = new SparkContext(conf)
>     val session = SparkSession.builder()
>       .config(conf)
>       .getOrCreate()
>     val ssc = new StreamingContext(sc, 
> Durations.milliseconds(durationSeconds))
>     val kafkaParams = Map[String, String](
>       "metadata.broker.list" -> kafka,
>       "bootstrap.servers" -> kafka,
>       "zookeeper.connect" -> zk,
>       "group.id" -> "recommend_stream_spark",
>       "key.serializer" -> 
> "org.apache.kafka.common.serialization.StringSerializer",
>       "key.deserializer" -> 
> "org.apache.kafka.common.serialization.StringDeserializer",
>       "value.deserializer" -> 
> "org.apache.kafka.common.serialization.StringDeserializer"
>     )
>     val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>       ssc,
>       kafkaParams,
>       topics
>     )
>     val timeFieldName = "log_time"
>     stream.foreachRDD(rddMsg => {
>       rddMsg.map(msg => {
>         val value = msg._2
>         val time = Utils.getTime(value, timeFieldName)
>         new Tuple2(time + "," + tag, value)
>       })
>         .toJavaRDD().foreachPartition(new Writer1()) // here
>     })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> {{more info 
> see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to