[
https://issues.apache.org/jira/browse/SPARK-14737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278375#comment-15278375
]
Faisal commented on SPARK-14737:
--------------------------------
{code}
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class DataProcessor2 implements Serializable {
private static final long serialVersionUID = 3071125481526170241L;
private static Logger log = LoggerFactory.getLogger("DataProcessor");
public static void main(String[] args) {
final String sparkCheckPointDir =
ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
DataProcessorContextFactory3 factory = new
DataProcessorContextFactory3();
JavaStreamingContext jssc =
JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);
// Start the process
jssc.start();
jssc.awaitTermination();
}
}
class DataProcessorContextFactory3 implements JavaStreamingContextFactory,
Serializable {
private static final long serialVersionUID = 6070911284191531450L;
private static Logger logger =
LoggerFactory.getLogger(DataProcessorContextFactory.class);
DataProcessorContextFactory3() {
}
@Override
public JavaStreamingContext create() {
logger.debug("creating new context..!");
final String brokers =
ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
final String topic =
ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
final String app = "app1";
final String offset =
ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");
logger.debug("Data processing configuration. brokers={}, topic={},
app={}, offset={}", brokers, topic, app,
offset);
if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) ||
StringUtils.isBlank(app)) {
System.err.println("Usage: DataProcessor <brokers> <topic>\n" +
Consts.KAFKA_BROKERS_NAME
+ " is a list of one or more Kafka brokers separated by
comma\n" + Consts.KAFKA_TOPIC_NAME
+ " is a kafka topic to consume from \n\n\n");
System.exit(1);
}
final String majorVersion = "1.0";
final String minorVersion = "3";
final String version = majorVersion + "." + minorVersion;
final String applicationName = "DataProcessor-" + topic + "-" + version;
// for dev environment
SparkConf sparkConf = new
SparkConf().setMaster("local[*]").setAppName(applicationName);
// for cluster environment
//SparkConf sparkConf = new SparkConf().setAppName(applicationName);
final long sparkBatchDuration = Long
.valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));
final String sparkCheckPointDir =
ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(sparkBatchDuration));
logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
jssc.checkpoint(sparkCheckPointDir);
HashSet<String> topicsSet = new
HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", offset);
kafkaParams.put("group.id", "app1");
// @formatter:off
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// @formatter:on
processRDD(messages, app);
return jssc;
}
private void processRDD(JavaPairInputDStream<String, String> messages,
final String app) {
JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());
rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {
private static final long serialVersionUID = 250647626267731218L;
@Override
public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
if (!currentRdd.isEmpty()) {
logger.debug("Receive RDD. Create JobDispatcherFunction at
HOST={}", FunctionUtil.getHostName());
currentRdd.foreachPartition(new
VoidFunction<Iterator<MsgStruct>>() {
@Override
public void call(Iterator<MsgStruct> arg0) throws
Exception {
while(arg0.hasNext()){
System.out.println(arg0.next().toString());
}
}
});
} else {
logger.debug("Current RDD is empty.");
}
return null;
}
});
}
public static class MessageProcessFunction implements
Function<Tuple2<String, String>, MsgStruct> {
@Override
public MsgStruct call(Tuple2<String, String> data) throws Exception {
String message = data._2();
System.out.println("message:"+message);
return MsgStruct.parse(message);
}
}
public static class MsgStruct implements Serializable{
private String message;
public static MsgStruct parse(String msg){
MsgStruct m = new MsgStruct();
m.message = msg;
return m;
}
public String toString(){
return "content inside="+message;
}
}
}
{code}
> Kafka Brokers are down - spark stream should retry
> --------------------------------------------------
>
> Key: SPARK-14737
> URL: https://issues.apache.org/jira/browse/SPARK-14737
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.3.0
> Environment: Suse Linux, Cloudera Enterprise 5.4.8 (#7 built by
> jenkins on 20151023-1205 git: d7dbdf29ac1d57ae9fb19958502d50dcf4e4fffd),
> kafka_2.10-0.8.2.2
> Reporter: Faisal
>
> I have spark streaming application that uses direct streaming - listening to
> KAFKA topic.
> {code}
> HashMap<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("metadata.broker.list", "broker1,broker2,broker3");
> kafkaParams.put("auto.offset.reset", "largest");
> HashSet<String> topicsSet = new HashSet<String>();
> topicsSet.add("Topic1");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(
> jssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicsSet
> );
> {code}
> I notice when i stop/shutdown kafka brokers, my spark application also
> shutdown.
> Here is the spark execution script
> {code}
> spark-submit \
> --master yarn-cluster \
> --files /home/siddiquf/spark/log4j-spark.xml
> --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark.xml" \
> --conf
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-spark.xml" \
> --class com.example.MyDataStreamProcessor \
> myapp.jar
> {code}
> Spark job submitted successfully and i can track the application driver and
> worker/executor nodes.
> Everything works fine but only concern if kafka borkers are offline or
> restarted my application controlled by yarn should not shutdown? but it does.
> If this is expected behavior then how to handle such situation with least
> maintenance? Keeping in mind Kafka cluster is not in hadoop cluster and
> managed by different team that is why requires our application to be
> resilient enough.
> Thanks
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]