[
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-18741:
-----------------------------------
Labels: auto-deprioritized-critical auto-deprioritized-major stale-minor
(was: auto-deprioritized-critical auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> ProcessWindowFunction's process function exception
> ---------------------------------------------------
>
> Key: FLINK-18741
> URL: https://issues.apache.org/jira/browse/FLINK-18741
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.10.0
> Reporter: mzz
> Priority: Minor
> Labels: auto-deprioritized-critical, auto-deprioritized-major,
> stale-minor
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
> (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
> .timeWindow(Time.seconds(15 * 60)) //15 min window
> .process(new ProcessWindowFunction[(String, String, String, String,
> String, String, String, String, String), CkResult, (String, String, String,
> String, String, String, String), TimeWindow] {
> var clickCount: ValueState[Long] = _
> * var requestCount: ValueState[Long] = _
> * var returnCount: ValueState[Long] = _
> var videoCount: ValueState[Long] = _
> var noVideoCount: ValueState[Long] = _
> override def open(parameters: Configuration): Unit = {
> clickCount = getRuntimeContext.getState(new
> ValueStateDescriptor("clickCount", classOf[Long]))
> * requestCount = getRuntimeContext.getState(new
> ValueStateDescriptor("requestCount", classOf[Long]))*
> returnCount = getRuntimeContext.getState(new
> ValueStateDescriptor("returnCount", classOf[Long]))
> videoCount = getRuntimeContext.getState(new
> ValueStateDescriptor("videoCount", classOf[Long]))
> noVideoCount = getRuntimeContext.getState(new
> ValueStateDescriptor("noVideoCount", classOf[Long]))
> }
> override def process(key: (String, String, String, String, String,
> String, String), context: Context, elements: Iterable[(String, String,
> String, String, String, String, String, String, String)], out:
> Collector[CkResult]) = {
> try {
> var clickNum: Long = clickCount.value
> val dateNow =
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong
> var requestNum: Long = requestCount.value
> var returnNum: Long = returnCount.value
> var videoNum: Long = videoCount.value
> var noVideoNum: Long = noVideoCount.value
> if (requestNum == null) {
> requestNum = 0
> }
>
> val ecpm = key._7.toDouble.formatted("%.2f").toFloat
> val created_at = getSecondTimestampTwo(new Date)
>
> * elements.foreach(e => {
> if ("adreq".equals(e._3)) {
> requestNum += 1
> println(key._1, requestNum)
> }
> })
> requestCount.update(requestNum)
> println(requestNum, key._1)*
>
> out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 *
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5,
> key._6, key._1, requestCount.value, returnCount.value, fill_rate,
> noVideoCount.value + videoCount.value,
> expose_rate, clickCount.value, click_rate, ecpm,
> (noVideoCount.value * ecpm + videoCount.value * ecpm /
> 1000.toFloat).formatted("%.2f").toFloat, created_at))
> }
> catch {
> case e: Exception => println(key, e)
> }
> }
> })
> {code}
> {code:java}
> elements.foreach(e => {
> if ("adreq".equals(e._3)) {
> requestNum += 1
> println(key._1, requestNum)
> // The values printed here like :
> //(key,1)
> //(key,2)
> //(key,3)
> }
> })
> //But print outside the for loop always like :
> //(key,0)
> println(requestNum, key._1)
> {code}
> who can help me ,plz thx。
--
This message was sent by Atlassian Jira
(v8.20.1#820001)