[
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851292#comment-17851292
]
Roman Khachatryan commented on FLINK-35502:
-------------------------------------------
[~yingz] , could you clarify the use case requiring setting
state.checkpoints.num-retained to 1500?
Most of the checkpoint data should already be stored on the (distributed) file
system and etcd/zk is only used to store the pointer; so the load on etcd/zk
shouldn't be high in normal cases.
> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -----------------------------------------------------------------
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
> Issue Type: Improvement
> Reporter: Ying Z
> Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0000000000000036044: xxxx
> checkpointID-0000000000000036045: xxxx
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If
> the
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
> setting is set too large, it can easily cause abnormalities in ZK/ETCD.
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable:
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> PUT at:
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader.
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status:
> Status(apiVersion=v1, code=422,
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must
> have at most 1048576 bytes, reason=FieldValueTooLong,
> additionalProperties={})l, group=null, kind=ConfigMap,
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null,
> additionalProperties=(}), kind=Status, message=ConfigMap
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null,
> resourceVersion=null, selfLink=null, additionalProperties={}),
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and
> The checkpoint metadata contains many repeated bytes, therefore it can
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and
> decompressing it when reading, to reduce storage pressure and improve IO
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // //
> copress and base64
> String compressedBase64 = compressAndEncode(checkpointJson);
> compressedData.put("checkpoint-all", compressedBase64);{code}
> {code:java}
> private static String compressAndEncode(String data) throws IOException {
> ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
> try (GZIPOutputStream gzipOutputStream = new
> GZIPOutputStream(outputStream))
> { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
> }
> byte[] compressedData = outputStream.toByteArray();
> return Base64.getEncoder().encodeToString(compressedData);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)