Hi Jorn, Thanks for your kind reply. I do accept that there might be something in the code. Any help would be appreciated.
To give you some insights, I checked the source of the message in kafka if it has been repeated twice. But, I could only find it once. Also, it would have been convincing if all the messages are duplicated instead of only few. Please find below my source code & also a snapshot of the message that is getting duplicated in the entire logs: <kafka stream> JavaDStream<Map<String, Object>> prePepForMappedJsonStream = stream.map(new Function<String, Map<String, Object>>() { Map<String, Object> mappedJson = null; @Override public Map<String, Object> call(String inputJsonMessage) { try { if(StringUtils.length(inputJsonMessage) != 2) { mappedJson = new HashMap<>(); StopWatch watch = StopWatchSingleton.instance(); watch.reset();watch.start(); logger.info("Transformation-1 Start:{} & Input Message is: {}", LocalDateTime.now(),inputJsonMessage); JsonToMapPrePepTransformer instance = new JsonToMapPrePepTransformer(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick")); mappedJson = instance.transformJsonToMap(inputJsonMessage); watch.stop(); logger.info("Transformation-1 End:{}, Elapsed:{} & OutputMessage is: {}", LocalDateTime.now(), watch.getTime(), mappedJson); } } catch (Exception e) { logger.error("",e); } } return mappedJson; } }); JavaDStream<Map<String, Object>> transformedStream = prePepForMappedJsonStream.map(new Function<Map<String,Object>, Map<String, Object>>() { Map<String, Object> resultMap = null; @Override public Map<String, Object> call(Map<String, Object> readyToTransformMap) throws Exception { if(readyToTransformMap != null) { StopWatch watch = StopWatchSingleton.instance(); watch.reset();watch.start(); logger.info("Transformation-2 Start:{} & Input Message is: {}", LocalDateTime.now(),readyToTransformMap); resultMap = new HashMap<>(); resultMap = YBEDFormatter.init(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"), readyToTransformMap); watch.stop(); logger.info("Transformation-2 End:{}, Elapsed:{} & OutputMessage is: {}", LocalDateTime.now(), watch.getTime(), resultMap); } return resultMap; } }); JavaDStream<ResultMapHolder> kafkaPreIngestStream = transformedStream.map(new Function<Map<String,Object>, ResultMapHolder>() { ResultMapHolder resultMapBean = null; @Override public ResultMapHolder call(Map<String, Object> finalTransformedMap) throws Exception { try { if(finalTransformedMap != null) { StopWatch watch = StopWatchSingleton.instance(); watch.reset();watch.start(); logger.info("Transformation-3 Start:{} & Input Message is: {}", LocalDateTime.now(),finalTransformedMap); resultMapBean = MapToArrayTransformerForYBIngestion.instance().transformMapToOrderedArrayOfValues(finalTransformedMap, tableColumns); watch.stop(); logger.info("Transformation-3 End:{}, Elapsed:{} & OutputMessage is: {}", LocalDateTime.now(), watch.getTime(), Arrays.toString(resultMapBean.getOutputRow())); } } catch (Exception e) { logger.error("",e); } return resultMapBean; } }); Please observe the loggers in the above code. I grepped through the entire logs across all the executors and found the record that is repeating. Please find the transformations of that message from logs: 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-1 Start:2018-10-26T20:36:18.975 & Input Message is: {"request_id":"7cad0cb2a7bf427481a83639f82ffcec"} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-1 End:2018-10-26T20:36:18.977, Elapsed:2 & OutputMessage is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2 Start:2018-10-26T20:36:18.978 & Input Message is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2 End:2018-10-26T20:36:18.981, Elapsed:3 & OutputMessage is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3 Start:2018-10-26T20:36:18.981 & Input Message is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3 End:2018-10-26T20:36:18.982, Elapsed:0 & OutputMessage is: [7cad0cb2a7bf427481a83639f82ffcec] 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2 Start:2018-10-26T20:36:18.983 & Input Message is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2 End:2018-10-26T20:36:18.990, Elapsed:1 & OutputMessage is: [7cad0cb2a7bf427481a83639f82ffcec] 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2 Start:2018-10-26T20:36:18.991 & Input Message is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2 End:2018-10-26T20:36:18.995, Elapsed:3 & OutputMessage is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3 Start:2018-10-26T20:36:18.995 & Input Message is: {request_id=7cad0cb2a7bf427481a83639f82ffcec} 18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3 End:2018-10-26T20:36:18.996, Elapsed:1 & OutputMessage is: [7cad0cb2a7bf427481a83639f82ffcec] Please note that, this is not happening to all the records. Example, in QA where I am testing out of 16000 messages the above single record is duplicated twice(Kafka has only one message) and the rest are present only once in DB. Let me know in case you need any thing else. Once again, thanks for looking in. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org