[ 
https://issues.apache.org/jira/browse/FLINK-36149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luolei updated FLINK-36149:
---------------------------
    Description: 
{code:java}
select *
from 
( 
    SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, FLOOR(proc_time 
TO day) order by proc_time asc ) as row_num 
    from  tableA
    where  cmd = 1 and user_id > 0
) 
where row_num <=10  {code}
Currently, the deduplication operator uses the Flink State TTL mechanism. The 
default behavior of this mechanism is that expired states are only cleaned up 
when they are accessed again. In our case, the key in the Flink state includes 
the LOOR (proc_time TO day) timestamp. For example, if today is December 28th, 
the new keys in the Flink state will include December 28th. When it becomes 
December 29th, the keys for new records will include December 29th, and the 
keys from December 28th will never be accessed again. Since they are not 
accessed, they will not be cleaned up by the Flink State TTL mechanism. As a 
result, the state in Flink will increase indefinitely.

 

 
{code:java}
2021-02-25 06:49:25,593 WARN  akka.remote.transport.netty.NettyTransport        
           [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: 
hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN  
akka.remote.ReliableDeliverySupervisor                       [] - Association 
with remote system [akka.tcp://[email protected]:60899] has failed, 
address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://[email protected]:60899]] Caused by: 
[java.net.ConnectException: Connection refused: 
hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: 
[2021-02-25 06:49:31.879]Container 
[pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running 
103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB 
physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing 
container.Dump of the process-tree for 
container_e26_1614150721877_0021_01_000004 :        |- PID PPID PGRPID SESSID 
CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) 
RSSMEM_USAGE(PAGES) FULL_CMD_LINE        |- 24551 24324 24324 24324 (java) 
1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java -Xmx1664299798 
-Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=hadoop02.tcd.com 
-Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= 
-Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 
-Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b 
-Dexecution.target=embedded 
-Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 
-Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 
-Djobmanager.rpc.port=54474 
-Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar
 -Drest.address=hadoop02.tcd.com 
-Djobmanager.memory.jvm-metaspace.size=268435456b 
-Djobmanager.memory.heap.size=1073741824b 
-Djobmanager.memory.jvm-overhead.max=201326592b        |- 24324 24315 24324 
24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java 
-Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
-XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='hadoop02.tcd.com' 
-Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' 
-Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' 
-Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' 
-Dexecution.target='embedded' 
-Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' 
-Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' 
-Djobmanager.rpc.port='54474' 
-Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar'
 -Drest.address='hadoop02.tcd.com' 
-Djobmanager.memory.jvm-metaspace.size='268435456b' 
-Djobmanager.memory.heap.size='1073741824b' 
-Djobmanager.memory.jvm-overhead.max='201326592b' 1> 
/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out
 2> 
/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err
[2021-02-25 06:49:31.896]Container killed on request. Exit code is 
143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143. 
{code}
!image-2024-08-24-23-09-53-925.png!

 
!image-2024-08-24-23-10-29-444.png!
 

 

  was:
{code:java}
select *
from 
( 
    SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, FLOOR(proc_time 
TO day) order by proc_time asc ) as row_num 
    from  tableA
    where  cmd = 1 and user_id > 0
) 
where row_num <=10  {code}


> Support cleaning up expired states to prevent the continuous increase of 
> states.
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-36149
>                 URL: https://issues.apache.org/jira/browse/FLINK-36149
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: luolei
>            Priority: Major
>         Attachments: image-2024-08-24-23-09-53-925.png, 
> image-2024-08-24-23-10-29-444.png
>
>
> {code:java}
> select *
> from 
> ( 
>     SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, 
> FLOOR(proc_time TO day) order by proc_time asc ) as row_num 
>     from  tableA
>     where  cmd = 1 and user_id > 0
> ) 
> where row_num <=10  {code}
> Currently, the deduplication operator uses the Flink State TTL mechanism. The 
> default behavior of this mechanism is that expired states are only cleaned up 
> when they are accessed again. In our case, the key in the Flink state 
> includes the LOOR (proc_time TO day) timestamp. For example, if today is 
> December 28th, the new keys in the Flink state will include December 28th. 
> When it becomes December 29th, the keys for new records will include December 
> 29th, and the keys from December 28th will never be accessed again. Since 
> they are not accessed, they will not be cleaned up by the Flink State TTL 
> mechanism. As a result, the state in Flink will increase indefinitely.
>  
>  
> {code:java}
> 2021-02-25 06:49:25,593 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN  
> akka.remote.ReliableDeliverySupervisor                       [] - Association 
> with remote system [akka.tcp://[email protected]:60899] has failed, 
> address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://[email protected]:60899]] Caused by: 
> [java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: 
> [2021-02-25 06:49:31.879]Container 
> [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running 
> 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB 
> physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing 
> container.Dump of the process-tree for 
> container_e26_1614150721877_0021_01_000004 :        |- PID PPID PGRPID SESSID 
> CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) 
> RSSMEM_USAGE(PAGES) FULL_CMD_LINE        |- 24551 24324 24324 24324 (java) 
> 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= 
> -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b 
> -Dexecution.target=embedded 
> -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 
> -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 
> -Djobmanager.rpc.port=54474 
> -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar
>  -Drest.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-metaspace.size=268435456b 
> -Djobmanager.memory.heap.size=1073741824b 
> -Djobmanager.memory.jvm-overhead.max=201326592b        |- 24324 24315 24324 
> 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address='hadoop02.tcd.com' 
> -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' 
> -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' 
> -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' 
> -Dexecution.target='embedded' 
> -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' 
> -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' 
> -Djobmanager.rpc.port='54474' 
> -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar'
>  -Drest.address='hadoop02.tcd.com' 
> -Djobmanager.memory.jvm-metaspace.size='268435456b' 
> -Djobmanager.memory.heap.size='1073741824b' 
> -Djobmanager.memory.jvm-overhead.max='201326592b' 1> 
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out
>  2> 
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err
> [2021-02-25 06:49:31.896]Container killed on request. Exit code is 
> 143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143. 
> {code}
> !image-2024-08-24-23-09-53-925.png!
>  
> !image-2024-08-24-23-10-29-444.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to