phanikumar created SPARK-30522:
----------------------------------

             Summary: Spark Streaming dynamic executors overried kafka 
parameters in cluster mode
                 Key: SPARK-30522
                 URL: https://issues.apache.org/jira/browse/SPARK-30522
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.3.2
            Reporter: phanikumar


I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
   
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    20/01/14 12:15:05 INFO 
FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 write ahead log files 
from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata    20/01/14 12:15:05 
INFO DirectKafkaInputDStream: Slide time = 5000 ms    20/01/14 12:15:05 INFO 
DirectKafkaInputDStream: Storage level = Serialized 1x Replicated    20/01/14 
12:15:05 INFO DirectKafkaInputDStream: Checkpoint interval = null    20/01/14 
12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    20/01/14 
12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    20/01/14 
12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated    
20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    20/01/14 
12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    20/01/14 12:15:05 
INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    20/01/14 12:15:05 
INFO ConsumerConfig: ConsumerConfig values:             auto.commit.interval.ms 
= 5000            auto.offset.reset = latest            bootstrap.servers = 
[1,2,3]            check.crcs = true            client.id = client-0            
connections.max.idle.ms = 540000            default.api.timeout.ms = 60000      
      enable.auto.commit = false            exclude.internal.topics = true      
      fetch.max.bytes = 52428800            fetch.max.wait.ms = 500            
fetch.min.bytes = 1            group.id = telemetry-streaming-service           
 heartbeat.interval.ms = 3000            interceptor.classes = []            
internal.leave.group.on.close = true            isolation.level = 
read_uncommitted            key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
   
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    20/01/14 
12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    20/01/14 12:15:04 INFO 
BlockManagerMaster: Registered BlockManager BlockManagerId(2, 
matrix-hwork-data-05, 40324, None)    20/01/14 12:15:04 INFO BlockManager: 
external shuffle service port = 7447    20/01/14 12:15:04 INFO BlockManager: 
Registering executor with local external shuffle service.    20/01/14 12:15:04 
INFO TransportClientFactory: Successfully created connection to 
matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)    
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    20/01/14 12:15:19 INFO 
CoarseGrainedExecutorBackend: Got assigned task 1    20/01/14 12:15:19 INFO 
Executor: Running task 1.0 in stage 0.0 (TID 1)    20/01/14 12:15:19 INFO 
TorrentBroadcast: Started reading broadcast variable 0    20/01/14 12:15:19 
INFO TransportClientFactory: Successfully created connection to 
matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps)    
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    20/01/14 12:15:20 INFO 
TorrentBroadcast: Reading broadcast variable 0 took 163 ms    20/01/14 12:15:20 
INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 
17.9 KB, free 6.2 GB)    20/01/14 12:15:20 INFO KafkaRDD: Computing topic 
telemetry, partition 1 offsets 237352170 -> 237352311    20/01/14 12:15:20 INFO 
CachedKafkaConsumer: Initializing cache 16 64 0.75    20/01/14 12:15:20 INFO 
CachedKafkaConsumer: Cache miss for 
CacheKey(spark-executor-telemetry-streaming-service,telemetry,1)    20/01/14 
12:15:20 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            auto.offset.reset = none            
bootstrap.servers = [1,2,3]            check.crcs = true            client.id = 
client-0            connections.max.idle.ms = 540000            
default.api.timeout.ms = 60000            enable.auto.commit = false            
exclude.internal.topics = true            fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500
If we closely observer in the first executor the **auto.offset.reset is 
latest** but for the other executors the **auto.offset.reset = none**{code}

Here is how I am creating the streaming context
 
{code:java}
 public void init() throws Exception {public void init() throws Exception {     
           final String BOOTSTRAP_SERVERS = PropertyFileReader.getInstance()    
                .getProperty("spark.streaming.kafka.broker.list");            
final String DYNAMIC_ALLOCATION_ENABLED = PropertyFileReader.getInstance()      
              .getProperty("spark.streaming.dynamicAllocation.enabled");        
    final String DYNAMIC_ALLOCATION_SCALING_INTERVAL = 
PropertyFileReader.getInstance()                    
.getProperty("spark.streaming.dynamicAllocation.scalingInterval");            
final String DYNAMIC_ALLOCATION_MIN_EXECUTORS = 
PropertyFileReader.getInstance()                    
.getProperty("spark.streaming.dynamicAllocation.minExecutors");            
final String DYNAMIC_ALLOCATION_MAX_EXECUTORS = 
PropertyFileReader.getInstance()                    
.getProperty("spark.streaming.dynamicAllocation.maxExecutors");            
final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = 
PropertyFileReader.getInstance()                    
.getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");          
  final String DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = 
PropertyFileReader.getInstance()                    
.getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");    
        final String SPARK_SHUFFLE_SERVICE_ENABLED = 
PropertyFileReader.getInstance()                    
.getProperty("spark.shuffle.service.enabled");            final String 
SPARK_LOCALITY_WAIT = 
PropertyFileReader.getInstance().getProperty("spark.locality.wait");            
final String SPARK_KAFKA_CONSUMER_POLL_INTERVAL = 
PropertyFileReader.getInstance()                    
.getProperty("spark.streaming.kafka.consumer.poll.ms");            final String 
SPARK_KAFKA_MAX_RATE_PER_PARTITION = PropertyFileReader.getInstance()           
         .getProperty("spark.streaming.kafka.maxRatePerPartition");            
final String SPARK_BATCH_DURATION_IN_SECONDS = PropertyFileReader.getInstance() 
                   .getProperty("spark.batch.duration.in.seconds");            
final String KAFKA_TOPIC = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic");    
            LOGGER.debug("connecting to brokers ::" + BOOTSTRAP_SERVERS);       
     LOGGER.debug("bootstrapping properties to create consumer");               
 kafkaParams = new HashMap<>();            kafkaParams.put("bootstrap.servers", 
BOOTSTRAP_SERVERS);            kafkaParams.put("key.deserializer", 
StringDeserializer.class);            kafkaParams.put("value.deserializer", 
StringDeserializer.class);            kafkaParams.put("group.id", 
"telemetry-streaming-service");            kafkaParams.put("auto.offset.reset", 
"latest");            kafkaParams.put("enable.auto.commit", false);            
kafkaParams.put("client.id","client-0");            // Below property should be 
enabled in properties and changed based on            // performance testing    
        kafkaParams.put("max.poll.records",                    
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records"));
                LOGGER.info("registering as a consumer with the topic :: " + 
KAFKA_TOPIC);            topics = Arrays.asList(KAFKA_TOPIC);            
sparkConf = new SparkConf()    //                
.setMaster(PropertyFileReader.getInstance().getProperty("spark.master.url"))    
                
.setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name"))
                    .set("spark.streaming.dynamicAllocation.enabled", 
DYNAMIC_ALLOCATION_ENABLED)                    
.set("spark.streaming.dynamicAllocation.scalingInterval", 
DYNAMIC_ALLOCATION_SCALING_INTERVAL)                    
.set("spark.streaming.dynamicAllocation.minExecutors", 
DYNAMIC_ALLOCATION_MIN_EXECUTORS)                    
.set("spark.streaming.dynamicAllocation.maxExecutors", 
DYNAMIC_ALLOCATION_MAX_EXECUTORS)                    
.set("spark.streaming.dynamicAllocation.executorIdleTimeout", 
DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)                    
.set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",             
               DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)                 
   .set("spark.shuffle.service.enabled", SPARK_SHUFFLE_SERVICE_ENABLED)         
           .set("spark.locality.wait", SPARK_LOCALITY_WAIT)                    
.set("spark.streaming.kafka.consumer.poll.ms", 
SPARK_KAFKA_CONSUMER_POLL_INTERVAL)                    
.set("spark.streaming.kafka.maxRatePerPartition", 
SPARK_KAFKA_MAX_RATE_PER_PARTITION);                LOGGER.debug("creating 
streaming context with minutes batch interval  ::: " + 
SPARK_BATCH_DURATION_IN_SECONDS);            streamingContext = new 
JavaStreamingContext(sparkConf,                    
Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));          
      /*             * todo: add checkpointing to the streaming context to 
recover from driver             * failures and also for offset management       
      */            LOGGER.info("checkpointing the streaming transactions at 
hdfs path :: /checkpoint");     streamingContext.checkpoint("/checkpoint");     
       streamingContext.addStreamingListener(new DataProcessingListener());     
   }            @Override        public void execute() throws 
InterruptedException {            LOGGER.info("started telemetry pipeline 
executor to consume data");            // Data Consume from the Kafka topic     
       JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = 
KafkaUtils.createDirectStream(                    streamingContext, 
LocationStrategies.PreferConsistent(),                    
ConsumerStrategies.Subscribe(topics, kafkaParams));                
telemetryStream.foreachRDD(rawRDD -> {                if (!rawRDD.isEmpty()) {  
                  OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rawRDD.rdd()).offsetRanges();                    LOGGER.debug("list of 
OffsetRanges getting processed as a string :: "                            + 
Arrays.asList(offsetRanges).toString());                    
System.out.println("offsetRanges : " + offsetRanges.length);                    
SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());              
      JavaPairRDD<String, String> flattenedRawRDD = rawRDD.mapToPair(record -> 
{                        //LOGGER.debug("flattening JSON record with telemetry 
json value ::: " + record.value());                        ObjectMapper om = 
new ObjectMapper();                        JsonNode root = 
om.readTree(record.value());                        Map<String, JsonNode> 
flattenedMap = new FlatJsonGenerator(root).flatten();                        
JsonNode flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);     
                   //LOGGER.debug("creating Tuple for the JSON record Key :: " 
+ flattenedRootNode.get("/name").asText()                        // + ", value 
:: " + flattenedRootNode.toString());                        return new 
Tuple2<String, String>(flattenedRootNode.get("/name").asText(),                 
               flattenedRootNode.toString());                    });            
            Dataset<Row> rawFlattenedDataRDD = spark                            
.createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()))                            .toDF("sensor_path", 
"sensor_data");                    Dataset<Row> groupedDS = 
rawFlattenedDataRDD.groupBy(col("sensor_path"))                            
.agg(collect_list(col("sensor_data").as("sensor_data")));                       
 Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r -> 
r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device"));
                        LOGGER.info("printing the LLDP GROUPED DS 
------------------>");                    lldpGroupedDS.show(2);                
    LOGGER.info("creating telemetry pipeline to process the telemetry data");   
                     HashMap<Object, Object> params = new HashMap<>();          
          params.put(DPConstants.OTSDB_CONFIG_F_PATH, 
ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path"));
                    params.put(DPConstants.OTSDB_CLIENT_TYPE, 
ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type"));  
                      try {                        
LOGGER.info("<-------------------processing lldp data and write to hive STARTED 
----------------->");                        Pipeline lldpPipeline = 
PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY);                        
lldpPipeline.process(lldpGroupedDS, null);                        
LOGGER.info("<-------------------processing lldp data and write to hive 
COMPLETED ----------------->");                            
LOGGER.info("<-------------------processing groupedDS data and write to 
OPENTSDB STARTED ----------------->");                        Pipeline pipeline 
= PipelineFactory.getPipeline(PipelineType.TELEMETRY);                        
pipeline.process(groupedDS, params);                        
LOGGER.info("<-------------------processing groupedDS data and write to 
OPENTSDB COMPLETED ----------------->");                        }catch 
(Throwable t){                        t.printStackTrace();                    } 
                       LOGGER.info("commiting offsets after processing the 
batch");                    ((CanCommitOffsets) 
telemetryStream.inputDStream()).commitAsync(offsetRanges);                    } 
           });                streamingContext.start();            
streamingContext.awaitTermination();        }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to