[ https://issues.apache.org/jira/browse/HADOOP-16021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Steve Loughran resolved HADOOP-16021. ------------------------------------- Resolution: Duplicate > SequenceFile.createWriter appendIfExists codec cause NullPointerException > ------------------------------------------------------------------------- > > Key: HADOOP-16021 > URL: https://issues.apache.org/jira/browse/HADOOP-16021 > Project: Hadoop Common > Issue Type: Bug > Components: common > Affects Versions: 2.7.3 > Environment: windows10 or Linux-centos , hadoop2.7.3, jdk8 > Reporter: asin > Priority: Major > Labels: bug > Attachments: 055.png, 62.png, CompressionType.BLOCK-Not > supported-error log.txt, CompressionType.NONE-NullPointerException-error > log.txt > > > > I want append the data in a file , when i use SequenceFile.appendIfExists , > it throw NullPointerException at at > org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119) > when i remove the 'appendIfExists', it works, but it will cover old file. > > when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not > support" exception > > {code:java} > // my code > SequenceFile.Writer writer = null; > writer = SequenceFile.createWriter(conf, > SequenceFile.Writer.file(path), > SequenceFile.Writer.keyClass(Text.class), > SequenceFile.Writer.valueClass(Text.class), > SequenceFile.Writer.appendIfExists(true) ); > {code} > > {code:java} > // all my code > public class Writer1 implements VoidFunction<Iterator<Tuple2<String, > String>>> { > private static Configuration conf = new Configuration(); > private int MAX_LINE = 3; // little num,for test > @Override > public void call(Iterator<Tuple2<String, String>> iterator) throws > Exception { > int partitionId = TaskContext.get().partitionId(); > int count = 0; > SequenceFile.Writer writer = null; > while (iterator.hasNext()) { > Tuple2<String, String> tp = iterator.next(); > Path path = new Path("D:/tmp-doc/logs/logs.txt"); > if (writer == null) > writer = SequenceFile.createWriter(conf, > SequenceFile.Writer.file(path), > SequenceFile.Writer.keyClass(Text.class), > SequenceFile.Writer.valueClass(Text.class), > SequenceFile.Writer.appendIfExists(true) > ); > writer.append(new Text(tp._1), new Text(tp._2)); > count++; > if (count > MAX_LINE) { > IOUtils.closeStream(writer); > count = 0; > writer = SequenceFile.createWriter(... // same as above > } > } > if (count > 0) { > IOUtils.closeStream(writer); > } > IOUtils.closeStream(writer); > } > } > {code} > // above code call by below > {code:java} > import com.xxx.algo.hadoop.Writer1 > import com.xxx.algo.utils.Utils > import kafka.serializer.StringDecoder > import org.apache.spark.sql.SparkSession > import org.apache.spark.streaming.kafka.KafkaUtils > import org.apache.spark.streaming.{Durations, StreamingContext} > import org.apache.spark.{SparkConf, SparkContext} > object KafkaSparkStreamingApp { > def main(args: Array[String]): Unit = { > val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092" > val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181" > val topics = Set("test.aries.collection.appevent.biz") > val tag = "biz" > val durationSeconds = 5000 > val conf = new SparkConf() > conf.setAppName("user-log-consumer") > .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer") > .set("spark.kryo.registrationRequired", "true") > .set("spark.defalut.parallelism","2") > .set("spark.rdd.compress","true") > .setMaster("local[2]") > val sc = new SparkContext(conf) > val session = SparkSession.builder() > .config(conf) > .getOrCreate() > val ssc = new StreamingContext(sc, > Durations.milliseconds(durationSeconds)) > val kafkaParams = Map[String, String]( > "metadata.broker.list" -> kafka, > "bootstrap.servers" -> kafka, > "zookeeper.connect" -> zk, > "group.id" -> "recommend_stream_spark", > "key.serializer" -> > "org.apache.kafka.common.serialization.StringSerializer", > "key.deserializer" -> > "org.apache.kafka.common.serialization.StringDeserializer", > "value.deserializer" -> > "org.apache.kafka.common.serialization.StringDeserializer" > ) > val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder]( > ssc, > kafkaParams, > topics > ) > val timeFieldName = "log_time" > stream.foreachRDD(rddMsg => { > rddMsg.map(msg => { > val value = msg._2 > val time = Utils.getTime(value, timeFieldName) > new Tuple2(time + "," + tag, value) > }) > .toJavaRDD().foreachPartition(new Writer1()) // here > }) > ssc.start() > ssc.awaitTermination() > } > } > {code} > {{more info > see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-dev-h...@hadoop.apache.org