[
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ying Z updated FLINK-35502:
---------------------------
Summary: compress the checkpoint metadata generated by ZK/ETCD HA Services
(was: compress the checkpoint metadata ZK/ETCD HA Services)
> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -----------------------------------------------------------------
>
> Key: FLINK-35502
> URL: https://issues.apache.org/jira/browse/FLINK-35502
> Project: Flink
> Issue Type: Improvement
> Reporter: Ying Z
> Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> ```
> checkpointID-0000000000000036044: xxxx
> checkpointID-0000000000000036045: xxxx
> ...
> ...
> ```
> However, neither of these are designed to store excessive amounts of data. If
> the
> [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
> setting is set too large, it can easily cause abnormalities in ZK/ETCD.
> The error log when set state.checkpoints.num-retained to 1500:
> ```
> Caused by: org.apache.flink.util.SerializedThrowable:
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> PUT at:
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader.
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status:
> Status(apiVersion=v1, code=422,
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must
> have at most 1048576 bytes, reason=FieldValueTooLong,
> additionalProperties={})l, group=null, kind=ConfigMap,
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null,
> additionalProperties=(}), kind=Status, message=ConfigMap
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null,
> resourceVersion=null, selfLink=null, additionalProperties={}),
> reason=Invalid, status=Failure, additionalProperties=(}).
> ```
> In Flink's code, all checkpoint metadata are updated at the same time, and
> The checkpoint metadata contains many repeated bytes, therefore it can
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and
> decompressing it when reading, to reduce storage pressure and improve IO
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> ```java
> // Map -> Json
> ObjectMapper objectMapper = new ObjectMapper();
> String checkpointJson =
> objectMapper.writeValueAsString(checkpointMap);
> // copress and base64
> String compressedBase64 = compressAndEncode(checkpointJson);
> compressedData.put("checkpoint-all", compressedBase64);
> private static String compressAndEncode(String data) throws IOException {
> ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
> try (GZIPOutputStream gzipOutputStream = new
> GZIPOutputStream(outputStream)) {
> gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
> }
> byte[] compressedData = outputStream.toByteArray();
> return Base64.getEncoder().encodeToString(compressedData);
> }
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)