[
https://issues.apache.org/jira/browse/FLINK-36149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-36149:
-----------------------------------
Labels: pull-request-available (was: )
> Support cleaning up expired states to prevent the continuous increase of
> states and add RocksDB state cleanup configuration.
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36149
> URL: https://issues.apache.org/jira/browse/FLINK-36149
> Project: Flink
> Issue Type: Improvement
> Reporter: luolei
> Priority: Major
> Labels: pull-request-available
> Attachments: 1724512324453.jpg, 1724512362249.jpg
>
>
> 1、Problem description:
> {code:java}
> select *
> from
> (
> SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id,
> FLOOR(proc_time TO day) order by proc_time asc ) as row_num
> from tableA
> where cmd = 1 and user_id > 0
> )
> where row_num <=10 {code}
> Currently, the deduplication operator uses the Flink State TTL mechanism. The
> default behavior of this mechanism is that expired states are only cleaned up
> when they are accessed again. In our case, the key in the Flink state
> includes the LOOR (proc_time TO day) timestamp. For example, if today is
> December 28th, the new keys in the Flink state will include December 28th.
> When it becomes December 29th, the keys for new records will include December
> 29th, and the keys from December 28th will never be accessed again. Since
> they are not accessed, they will not be cleaned up by the Flink State TTL
> mechanism. As a result, the state in Flink will increase indefinitely.
>
>
> {code:java}
> 2021-02-25 06:49:25,593 WARN akka.remote.transport.netty.NettyTransport
> [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused:
> hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN
> akka.remote.ReliableDeliverySupervisor [] - Association
> with remote system [akka.tcp://[email protected]:60899] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://[email protected]:60899]] Caused by:
> [java.net.ConnectException: Connection refused:
> hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics:
> [2021-02-25 06:49:31.879]Container
> [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running
> 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB
> physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing
> container.Dump of the process-tree for
> container_e26_1614150721877_0021_01_000004 : |- PID PPID PGRPID SESSID
> CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES)
> RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 24551 24324 24324 24324 (java)
> 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243
> -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir .
> -Djobmanager.rpc.address=hadoop02.tcd.com
> -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths=
> -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b
> -Dexecution.target=embedded
> -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8
> -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041
> -Djobmanager.rpc.port=54474
> -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar
> -Drest.address=hadoop02.tcd.com
> -Djobmanager.memory.jvm-metaspace.size=268435456b
> -Djobmanager.memory.heap.size=1073741824b
> -Djobmanager.memory.jvm-overhead.max=201326592b |- 24324 24315 24324
> 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243
> -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir .
> -Djobmanager.rpc.address='hadoop02.tcd.com'
> -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths=''
> -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004'
> -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b'
> -Dexecution.target='embedded'
> -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8'
> -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041'
> -Djobmanager.rpc.port='54474'
> -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar'
> -Drest.address='hadoop02.tcd.com'
> -Djobmanager.memory.jvm-metaspace.size='268435456b'
> -Djobmanager.memory.heap.size='1073741824b'
> -Djobmanager.memory.jvm-overhead.max='201326592b' 1>
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out
> 2>
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err
> [2021-02-25 06:49:31.896]Container killed on request. Exit code is
> 143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143.
> {code}
>
> 2、Solution:
> 2.1 The Flink State TTL mechanism has added the {{cleanupFullSnapshot}} and
> {{cleanupInRocksdbCompactFilter}} methods to clean up old states, even if
> they have not been accessed.
> * {{{}cleanupFullSnapshot{}}}: Removes expired states during a full
> snapshot, thereby cleaning up old states.
> * {{{}cleanupInRocksdbCompactFilter{}}}: Allows specifying the
> {{queryTimeAfterNumEntries}} parameter. This parameter determines after how
> many state entries the current timestamp should be updated. When RocksDB
> performs compaction operations in the background, it uses the current
> timestamp to determine whether a state is expired and filters out those
> expired keys and values. If the {{queryTimeAfterNumEntries}} value is set
> low, it will speed up the state cleanup process. However, since Flink calls
> RocksDB code via JNI, frequent calls can incur significant overhead.
> 2.2 add RocksDB state cleanup configuration in Rank operators
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)