[ https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-18741: ----------------------------------- Labels: auto-deprioritized-critical (was: stale-critical) > ProcessWindowFunction's process function exception > --------------------------------------------------- > > Key: FLINK-18741 > URL: https://issues.apache.org/jira/browse/FLINK-18741 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.10.0 > Reporter: mzz > Priority: Critical > Labels: auto-deprioritized-critical > > I use ProcessWindowFunction to achieve PV calculation, but when rewriting > process, the user-defined state value cannot be returned。 > code: > {code:java} > tem.keyBy(x => > (x._1, x._2, x._4, x._5, x._6, x._7, x._8)) > .timeWindow(Time.seconds(15 * 60)) //15 min window > .process(new ProcessWindowFunction[(String, String, String, String, > String, String, String, String, String), CkResult, (String, String, String, > String, String, String, String), TimeWindow] { > var clickCount: ValueState[Long] = _ > * var requestCount: ValueState[Long] = _ > * var returnCount: ValueState[Long] = _ > var videoCount: ValueState[Long] = _ > var noVideoCount: ValueState[Long] = _ > override def open(parameters: Configuration): Unit = { > clickCount = getRuntimeContext.getState(new > ValueStateDescriptor("clickCount", classOf[Long])) > * requestCount = getRuntimeContext.getState(new > ValueStateDescriptor("requestCount", classOf[Long]))* > returnCount = getRuntimeContext.getState(new > ValueStateDescriptor("returnCount", classOf[Long])) > videoCount = getRuntimeContext.getState(new > ValueStateDescriptor("videoCount", classOf[Long])) > noVideoCount = getRuntimeContext.getState(new > ValueStateDescriptor("noVideoCount", classOf[Long])) > } > override def process(key: (String, String, String, String, String, > String, String), context: Context, elements: Iterable[(String, String, > String, String, String, String, String, String, String)], out: > Collector[CkResult]) = { > try { > var clickNum: Long = clickCount.value > val dateNow = > LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong > var requestNum: Long = requestCount.value > var returnNum: Long = returnCount.value > var videoNum: Long = videoCount.value > var noVideoNum: Long = noVideoCount.value > if (requestNum == null) { > requestNum = 0 > } > > val ecpm = key._7.toDouble.formatted("%.2f").toFloat > val created_at = getSecondTimestampTwo(new Date) > > * elements.foreach(e => { > if ("adreq".equals(e._3)) { > requestNum += 1 > println(key._1, requestNum) > } > }) > requestCount.update(requestNum) > println(requestNum, key._1)* > > out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * > 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, > key._6, key._1, requestCount.value, returnCount.value, fill_rate, > noVideoCount.value + videoCount.value, > expose_rate, clickCount.value, click_rate, ecpm, > (noVideoCount.value * ecpm + videoCount.value * ecpm / > 1000.toFloat).formatted("%.2f").toFloat, created_at)) > } > catch { > case e: Exception => println(key, e) > } > } > }) > {code} > {code:java} > elements.foreach(e => { > if ("adreq".equals(e._3)) { > requestNum += 1 > println(key._1, requestNum) > // The values printed here like : > //(key,1) > //(key,2) > //(key,3) > } > }) > //But print outside the for loop always like : > //(key,0) > println(requestNum, key._1) > {code} > who can help me ,plz thx。 -- This message was sent by Atlassian Jira (v8.3.4#803005)