[
https://issues.apache.org/jira/browse/HADOOP-16021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
asin updated HADOOP-16021:
--------------------------
Description:
I want append the data in a file , when i use SequenceFile.appendIfExists , it
throw NullPointerException at at
org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
when i remove the 'appendIfExists', it works, but it will cover old file.
when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not
support" exception
{code:java}
// my code
SequenceFile.Writer writer = null;
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.appendIfExists(true) );
{code}
{code:java}
// all my code
public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>> {
private static Configuration conf = new Configuration();
private int MAX_LINE = 3; // little num,for test
@Override
public void call(Iterator<Tuple2<String, String>> iterator) throws
Exception {
int partitionId = TaskContext.get().partitionId();
int count = 0;
SequenceFile.Writer writer = null;
while (iterator.hasNext()) {
Tuple2<String, String> tp = iterator.next();
Path path = new Path("D:/tmp-doc/logs/logs.txt");
if (writer == null)
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.appendIfExists(true)
);
writer.append(new Text(tp._1), new Text(tp._2));
count++;
if (count > MAX_LINE) {
IOUtils.closeStream(writer);
count = 0;
writer = SequenceFile.createWriter(... // same as above
}
}
if (count > 0) {
IOUtils.closeStream(writer);
}
IOUtils.closeStream(writer);
}
}
{code}
// above code call by below
{code:java}
import com.xxx.algo.hadoop.Writer1
import com.xxx.algo.utils.Utils
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object KafkaSparkStreamingApp {
def main(args: Array[String]): Unit = {
val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
val topics = Set("test.aries.collection.appevent.biz")
val tag = "biz"
val durationSeconds = 5000
val conf = new SparkConf()
conf.setAppName("user-log-consumer")
.set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.defalut.parallelism","2")
.set("spark.rdd.compress","true")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val session = SparkSession.builder()
.config(conf)
.getOrCreate()
val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
val kafkaParams = Map[String, String](
"metadata.broker.list" -> kafka,
"bootstrap.servers" -> kafka,
"zookeeper.connect" -> zk,
"group.id" -> "recommend_stream_spark",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer"
)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
ssc,
kafkaParams,
topics
)
val timeFieldName = "log_time"
stream.foreachRDD(rddMsg => {
rddMsg.map(msg => {
val value = msg._2
val time = Utils.getTime(value, timeFieldName)
new Tuple2(time + "," + tag, value)
})
.toJavaRDD().foreachPartition(new Writer1())
})
ssc.start()
ssc.awaitTermination()
}
}
{code}
{{more info
see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}
was:
I want append the data in a file , when i use SequenceFile.appendIfExists , it
throw NullPointerException at at
org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
when i remove the 'appendIfExists', it works, but it will cover old file.
when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not
support" exception
{code:java}
// my code
SequenceFile.Writer writer = null;
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.appendIfExists(true) );
{code}
{code:java}
// all my code
public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>> {
private static Configuration conf = new Configuration();
private int MAX_LINE = 3; // little num,for test
@Override
public void call(Iterator<Tuple2<String, String>> iterator) throws
Exception {
int partitionId = TaskContext.get().partitionId();
int count = 0;
SequenceFile.Writer writer = null;
while (iterator.hasNext()) {
Tuple2<String, String> tp = iterator.next();
Path path = new Path("D:/tmp-doc/logs/logs.txt");
if (writer == null)
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.appendIfExists(true)
);
writer.append(new Text(tp._1), new Text(tp._2));
count++;
if (count > MAX_LINE) {
IOUtils.closeStream(writer);
count = 0;
writer = SequenceFile.createWriter(... // same as above
}
}
if (count > 0) {
IOUtils.closeStream(writer);
}
IOUtils.closeStream(writer);
}
}
{code}
{{more info
see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}
> SequenceFile.createWriter appendIfExists codec cause NullPointerException
> -------------------------------------------------------------------------
>
> Key: HADOOP-16021
> URL: https://issues.apache.org/jira/browse/HADOOP-16021
> Project: Hadoop Common
> Issue Type: Bug
> Components: common
> Affects Versions: 2.7.3
> Environment: windows10, hadoop2.7.3, jdk8
> Reporter: asin
> Priority: Major
> Labels: bug
> Attachments: 57.png
>
>
>
> I want append the data in a file , when i use SequenceFile.appendIfExists ,
> it throw NullPointerException at at
> org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
> when i remove the 'appendIfExists', it works, but it will cover old file.
>
> when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not
> support" exception
>
> {code:java}
> // my code
> SequenceFile.Writer writer = null;
> writer = SequenceFile.createWriter(conf,
> SequenceFile.Writer.file(path),
> SequenceFile.Writer.keyClass(Text.class),
> SequenceFile.Writer.valueClass(Text.class),
> SequenceFile.Writer.appendIfExists(true) );
> {code}
>
> {code:java}
> // all my code
> public class Writer1 implements VoidFunction<Iterator<Tuple2<String,
> String>>> {
> private static Configuration conf = new Configuration();
> private int MAX_LINE = 3; // little num,for test
> @Override
> public void call(Iterator<Tuple2<String, String>> iterator) throws
> Exception {
> int partitionId = TaskContext.get().partitionId();
> int count = 0;
> SequenceFile.Writer writer = null;
> while (iterator.hasNext()) {
> Tuple2<String, String> tp = iterator.next();
> Path path = new Path("D:/tmp-doc/logs/logs.txt");
> if (writer == null)
> writer = SequenceFile.createWriter(conf,
> SequenceFile.Writer.file(path),
> SequenceFile.Writer.keyClass(Text.class),
> SequenceFile.Writer.valueClass(Text.class),
> SequenceFile.Writer.appendIfExists(true)
> );
> writer.append(new Text(tp._1), new Text(tp._2));
> count++;
> if (count > MAX_LINE) {
> IOUtils.closeStream(writer);
> count = 0;
> writer = SequenceFile.createWriter(... // same as above
> }
> }
> if (count > 0) {
> IOUtils.closeStream(writer);
> }
> IOUtils.closeStream(writer);
> }
> }
> {code}
> // above code call by below
> {code:java}
> import com.xxx.algo.hadoop.Writer1
> import com.xxx.algo.utils.Utils
> import kafka.serializer.StringDecoder
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.apache.spark.streaming.{Durations, StreamingContext}
> import org.apache.spark.{SparkConf, SparkContext}
> object KafkaSparkStreamingApp {
> def main(args: Array[String]): Unit = {
> val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
> val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
> val topics = Set("test.aries.collection.appevent.biz")
> val tag = "biz"
> val durationSeconds = 5000
> val conf = new SparkConf()
> conf.setAppName("user-log-consumer")
> .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrationRequired", "true")
> .set("spark.defalut.parallelism","2")
> .set("spark.rdd.compress","true")
> .setMaster("local[2]")
> val sc = new SparkContext(conf)
> val session = SparkSession.builder()
> .config(conf)
> .getOrCreate()
> val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
> val kafkaParams = Map[String, String](
> "metadata.broker.list" -> kafka,
> "bootstrap.servers" -> kafka,
> "zookeeper.connect" -> zk,
> "group.id" -> "recommend_stream_spark",
> "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
> "key.deserializer" ->
> "org.apache.kafka.common.serialization.StringDeserializer",
> "value.deserializer" ->
> "org.apache.kafka.common.serialization.StringDeserializer"
> )
> val stream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](
> ssc,
> kafkaParams,
> topics
> )
> val timeFieldName = "log_time"
> stream.foreachRDD(rddMsg => {
> rddMsg.map(msg => {
> val value = msg._2
> val time = Utils.getTime(value, timeFieldName)
> new Tuple2(time + "," + tag, value)
> })
> .toJavaRDD().foreachPartition(new Writer1())
> })
> ssc.start()
> ssc.awaitTermination()
> }
> }
> {code}
> {{more info
> see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]