Hi Jorn,

Thanks for your kind reply. I do accept that there might be something in the
code. Any help would be appreciated. 

To give you some insights, I checked the source of the message in kafka if
it has been repeated twice. But, I could only find it once. Also, it would
have been convincing if all the messages are duplicated instead of only few.
Please find below my source code & also a snapshot of the message that is
getting duplicated in the entire logs:


                <kafka stream>

                JavaDStream<Map&lt;String, Object>> prePepForMappedJsonStream =
stream.map(new Function<String, Map&lt;String, Object>>() {
                
                        Map<String, Object> mappedJson = null;
                        @Override
                        public Map<String, Object> call(String 
inputJsonMessage) {
                                try {
                                        if(StringUtils.length(inputJsonMessage) 
!= 2) {
                                                mappedJson = new HashMap<>();
                                                StopWatch watch = 
StopWatchSingleton.instance();
                                                watch.reset();watch.start();
                                                logger.info("Transformation-1 
Start:{} & Input Message is: {}",
LocalDateTime.now(),inputJsonMessage);
                                                JsonToMapPrePepTransformer 
instance = new
JsonToMapPrePepTransformer(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"));
                                                mappedJson = 
instance.transformJsonToMap(inputJsonMessage);
                                                watch.stop();
                                                logger.info("Transformation-1 
End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), mappedJson);
                                        }
                                } catch (Exception e) {
                                                logger.error("",e);
                                        }
                                }
                                return mappedJson;
                        }                       
                });
                
                JavaDStream<Map&lt;String, Object>> transformedStream =
prePepForMappedJsonStream.map(new Function<Map&lt;String,Object>,
Map<String, Object>>() {
                        
                        Map<String, Object> resultMap = null;
                        @Override
                        public Map<String, Object> call(Map<String, Object> 
readyToTransformMap)
throws Exception {
                                if(readyToTransformMap != null) {
                                        StopWatch watch = 
StopWatchSingleton.instance();
                                        watch.reset();watch.start();
                                        logger.info("Transformation-2 Start:{} 
& Input Message is: {}",
LocalDateTime.now(),readyToTransformMap);
                                        resultMap = new HashMap<>();
                                        resultMap =
YBEDFormatter.init(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"),
readyToTransformMap);
                                        watch.stop();
                                        logger.info("Transformation-2 End:{}, 
Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), resultMap);
                                }
                                return resultMap;
                        }
                });
                
                JavaDStream<ResultMapHolder> kafkaPreIngestStream =
transformedStream.map(new Function<Map&lt;String,Object>, ResultMapHolder>()
{
                        
                        ResultMapHolder resultMapBean = null;
                        @Override
                        public ResultMapHolder call(Map<String, Object> 
finalTransformedMap)
throws Exception {
                                try {
                                        if(finalTransformedMap != null) {
                                                StopWatch watch = 
StopWatchSingleton.instance();
                                                watch.reset();watch.start();
                                                logger.info("Transformation-3 
Start:{} & Input Message is: {}",
LocalDateTime.now(),finalTransformedMap);
                                                resultMapBean =
MapToArrayTransformerForYBIngestion.instance().transformMapToOrderedArrayOfValues(finalTransformedMap,
tableColumns);
                                                watch.stop();
                                                logger.info("Transformation-3 
End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(),
Arrays.toString(resultMapBean.getOutputRow()));
                                        }
                                } catch (Exception e) {
                                                logger.error("",e);
                                }
                                return resultMapBean;
                        }
                });
                
        

Please observe the loggers in the above code. I grepped through the entire
logs across all the executors and found the record that is repeating. Please
find the transformations of that message from logs:


18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-1
Start:2018-10-26T20:36:18.975 & Input Message is:
{"request_id":"7cad0cb2a7bf427481a83639f82ffcec"}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-1
End:2018-10-26T20:36:18.977, Elapsed:2 & OutputMessage is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
Start:2018-10-26T20:36:18.978 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
End:2018-10-26T20:36:18.981, Elapsed:3 & OutputMessage is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
Start:2018-10-26T20:36:18.981 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
End:2018-10-26T20:36:18.982, Elapsed:0 & OutputMessage is:
[7cad0cb2a7bf427481a83639f82ffcec]
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
Start:2018-10-26T20:36:18.983 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
End:2018-10-26T20:36:18.990, Elapsed:1 & OutputMessage is:
[7cad0cb2a7bf427481a83639f82ffcec]
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
Start:2018-10-26T20:36:18.991 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
End:2018-10-26T20:36:18.995, Elapsed:3 & OutputMessage is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
Start:2018-10-26T20:36:18.995 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
End:2018-10-26T20:36:18.996, Elapsed:1 & OutputMessage is:
[7cad0cb2a7bf427481a83639f82ffcec]


Please note that, this is not happening to all the records. Example, in QA
where I am testing out of 16000 messages the above single record is
duplicated twice(Kafka has only one message) and the rest are present only
once in DB. 

Let me know in case you need any thing else. Once again, thanks for looking
in.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to