[
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171876#comment-17171876
]
mzz edited comment on FLINK-18741 at 8/6/20, 2:52 AM:
------------------------------------------------------
[~aljoscha]
thx for your reply.
When I use ProcessWindowFunction , override process function, I traverse the
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign
the initial value. Then, the iterator is traversed to give the variable + 1.
However, when the window ends, the value of the variable will be reset to the
initial value.
*code:*
{code:java}
elements.foreach(e => {
if ("adreq".equals(e._3)) {
requestNum += 1
println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
})
//But print outside the for loop always like :
//(key,0)
println(requestNum, key._1)
{code}
was (Author: mzz_q):
When I use ProcessWindowFunction , override process function, I traverse the
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign
the initial value. Then, the iterator is traversed to give the variable + 1.
However, when the window ends, the value of the variable will be reset to the
initial value.
*code:*
{code:java}
elements.foreach(e => {
if ("adreq".equals(e._3)) {
requestNum += 1
println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
})
//But print outside the for loop always like :
//(key,0)
println(requestNum, key._1)
{code}
> ProcessWindowFunction's process function exception
> ---------------------------------------------------
>
> Key: FLINK-18741
> URL: https://issues.apache.org/jira/browse/FLINK-18741
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.10.0
> Reporter: mzz
> Priority: Critical
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
> (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
> .timeWindow(Time.seconds(15 * 60)) //15 min window
> .process(new ProcessWindowFunction[(String, String, String, String,
> String, String, String, String, String), CkResult, (String, String, String,
> String, String, String, String), TimeWindow] {
> var clickCount: ValueState[Long] = _
> * var requestCount: ValueState[Long] = _
> * var returnCount: ValueState[Long] = _
> var videoCount: ValueState[Long] = _
> var noVideoCount: ValueState[Long] = _
> override def open(parameters: Configuration): Unit = {
> clickCount = getRuntimeContext.getState(new
> ValueStateDescriptor("clickCount", classOf[Long]))
> * requestCount = getRuntimeContext.getState(new
> ValueStateDescriptor("requestCount", classOf[Long]))*
> returnCount = getRuntimeContext.getState(new
> ValueStateDescriptor("returnCount", classOf[Long]))
> videoCount = getRuntimeContext.getState(new
> ValueStateDescriptor("videoCount", classOf[Long]))
> noVideoCount = getRuntimeContext.getState(new
> ValueStateDescriptor("noVideoCount", classOf[Long]))
> }
> override def process(key: (String, String, String, String, String,
> String, String), context: Context, elements: Iterable[(String, String,
> String, String, String, String, String, String, String)], out:
> Collector[CkResult]) = {
> try {
> var clickNum: Long = clickCount.value
> val dateNow =
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong
> var requestNum: Long = requestCount.value
> var returnNum: Long = returnCount.value
> var videoNum: Long = videoCount.value
> var noVideoNum: Long = noVideoCount.value
> if (requestNum == null) {
> requestNum = 0
> }
>
> val ecpm = key._7.toDouble.formatted("%.2f").toFloat
> val created_at = getSecondTimestampTwo(new Date)
>
> * elements.foreach(e => {
> if ("adreq".equals(e._3)) {
> requestNum += 1
> println(key._1, requestNum)
> }
> })
> requestCount.update(requestNum)
> println(requestNum, key._1)*
>
> out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 *
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5,
> key._6, key._1, requestCount.value, returnCount.value, fill_rate,
> noVideoCount.value + videoCount.value,
> expose_rate, clickCount.value, click_rate, ecpm,
> (noVideoCount.value * ecpm + videoCount.value * ecpm /
> 1000.toFloat).formatted("%.2f").toFloat, created_at))
> }
> catch {
> case e: Exception => println(key, e)
> }
> }
> })
> {code}
> {code:java}
> elements.foreach(e => {
> if ("adreq".equals(e._3)) {
> requestNum += 1
> println(key._1, requestNum)
> // The values printed here like :
> //(key,1)
> //(key,2)
> //(key,3)
> }
> })
> //But print outside the for loop always like :
> //(key,0)
> println(requestNum, key._1)
> {code}
> who can help me ,plz thx。
--
This message was sent by Atlassian Jira
(v8.3.4#803005)