[ 
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-18741:
-----------------------------------
    Labels: auto-deprioritized-critical  (was: stale-critical)

> ProcessWindowFunction's  process function exception
> ---------------------------------------------------
>
>                 Key: FLINK-18741
>                 URL: https://issues.apache.org/jira/browse/FLINK-18741
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.10.0
>            Reporter: mzz
>            Priority: Critical
>              Labels: auto-deprioritized-critical
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting 
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
>       (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
>       .timeWindow(Time.seconds(15 * 60)) //15 min window
>       .process(new ProcessWindowFunction[(String, String, String, String, 
> String, String, String, String, String), CkResult, (String, String, String, 
> String, String, String, String), TimeWindow] {
>       var clickCount: ValueState[Long] = _
> *      var requestCount: ValueState[Long] = _
> *      var returnCount: ValueState[Long] = _
>       var videoCount: ValueState[Long] = _
>       var noVideoCount: ValueState[Long] = _
>       override def open(parameters: Configuration): Unit = {
>         clickCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("clickCount", classOf[Long]))
>        * requestCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("requestCount", classOf[Long]))*
>         returnCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("returnCount", classOf[Long]))
>         videoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("videoCount", classOf[Long]))
>         noVideoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("noVideoCount", classOf[Long]))
>       }
>       override def process(key: (String, String, String, String, String, 
> String, String), context: Context, elements: Iterable[(String, String, 
> String, String, String, String, String, String, String)], out: 
> Collector[CkResult]) = {
>         try {
>           var clickNum: Long = clickCount.value
>           val dateNow = 
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong
>           var requestNum: Long = requestCount.value
>           var returnNum: Long = returnCount.value
>           var videoNum: Long = videoCount.value
>           var noVideoNum: Long = noVideoCount.value
>           if (requestNum == null) {
>             requestNum = 0
>           }
>           
>           val ecpm = key._7.toDouble.formatted("%.2f").toFloat
>           val created_at = getSecondTimestampTwo(new Date)
>          
> *          elements.foreach(e => {
>             if ("adreq".equals(e._3)) {
>               requestNum += 1
>               println(key._1, requestNum)
>             }
>           })
>           requestCount.update(requestNum)
>           println(requestNum, key._1)*
>   
>           out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, 
> key._6, key._1, requestCount.value, returnCount.value, fill_rate, 
> noVideoCount.value + videoCount.value,
>             expose_rate, clickCount.value, click_rate, ecpm, 
> (noVideoCount.value * ecpm + videoCount.value * ecpm / 
> 1000.toFloat).formatted("%.2f").toFloat, created_at))
>         }
>         catch {
>           case e: Exception => println(key, e)
>         }
>       }
>     })
> {code}
> {code:java}
>           elements.foreach(e => {
>             if ("adreq".equals(e._3)) {
>               requestNum += 1
>               println(key._1, requestNum)
> // The values printed here like :
> //(key,1)
> //(key,2)
> //(key,3)
>             }
>           })
> //But print outside the for loop always like :
> //(key,0)
>           println(requestNum, key._1)
> {code}
> who can help me ,plz thx。



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to