[
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yue Ma updated FLINK-26050:
---------------------------
Attachment: image-2022-02-11-10-32-14-956.png
> Too many small sst files in rocksdb state backend when using processing time
> window
> -----------------------------------------------------------------------------------
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.10.2, 1.14.3
> Reporter: shen
> Priority: Major
> Attachments: image-2022-02-09-21-22-13-920.png,
> image-2022-02-11-10-32-14-956.png
>
>
> When using processing time window, in some workload, there will be a lot of
> small sst files(serveral KB) in rocksdb local directory and may cause "Too
> many files error".
> Use rocksdb tool ldb to find out content in sst files:
> * column family of these small sst files is "processing_window-timers".
> * most sst files are in level-1.
> * records in sst files are almost kTypeDeletion.
> * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is
> triggered. Although all content in sst are delete tags, they are not
> compacted and deleted in rocksdb compaction because of not intersecting with
> each other(rocksdb [compaction trivial
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And
> there seems to be no chance to delete them because of small size and not
> intersect with other sst files.
>
> I will attach a simple program to reproduce the problem.
>
> Since timer in processing time window is generated in strictly ascending
> order(both put and delete). So If workload of job happen to generate level-0
> sst files not intersect with each other.(for example: processing window size
> much smaller than checkpoint interval, and no window content cross checkpoint
> interval or no new data in window crossing checkpoint interval). There will
> be many small sst files generated until job restored from savepoint, or
> incremental checkpoint is disabled.
>
> May be similar problem exists when user use timer in operators with same
> workload.
>
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp {
> public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1,
> config));
> }
> public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(20000); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
> true); // need to be reconfigured
>
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
> // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(100000);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setTaskCancellationInterval(10000);
> for (int i = 0; i < 1; ++i) {
> createOnePipeline(env);
> }
> env.execute("StreamApp");
> }
> private void createOnePipeline(StreamExecutionEnvironment env) {
> // data source is configured so that little window cross checkpoint
> interval
> DataStreamSource<String> stream = env.addSource(new Generator(1, 3000,
> 3600));
> stream.keyBy(x -> x)
> // make sure window size less than checkpoint interval. though 100ms
> is too small, I think increase this value can still reproduce the problem
> with longer time.
> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
> .process(new ProcessWindowFunction<String, String, String,
> TimeWindow>() {
> @Override
> public void process(String s, ProcessWindowFunction<String, String,
> String, TimeWindow>.Context context,
> Iterable<String> elements, Collector<String> out) {
> for (String ele: elements) {
> out.collect(ele);
> }
> }
> }).print();
> }
> public static final class Generator implements SourceFunction<String>,
> ListCheckpointed<Integer> {
> private static final long serialVersionUID = -2819385275681175792L;
> private final int numKeys;
> private final int idlenessMs;
> private final int recordsToEmit;
> private volatile int numRecordsEmitted = 0;
> private volatile boolean canceled = false;
> Generator(final int numKeys, final int idlenessMs, final int
> durationSeconds) {
> this.numKeys = numKeys;
> this.idlenessMs = idlenessMs;
> this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
> }
> @Override
> public void run(final SourceContext<String> ctx) throws Exception {
> Random rnd = new Random();
> while (numRecordsEmitted < recordsToEmit) {
> synchronized (ctx.getCheckpointLock()) {
> for (int i = 0; i < numKeys; i++) {
> ctx.collect("" + rnd.nextInt(1));
> numRecordsEmitted++;
> }
> }
> Thread.sleep(idlenessMs);
> }
> while (!canceled) {
> Thread.sleep(50);
> }
> }
> @Override
> public void cancel() {
> canceled = true;
> }
> @Override
> public List<Integer> snapshotState(final long checkpointId, final long
> timestamp) {
> return Collections.singletonList(numRecordsEmitted);
> }
> @Override
> public void restoreState(final List<Integer> states) {
> for (final Integer state : states) {
> numRecordsEmitted += state;
> }
> }
> }
> }
> {code}
>
> Code to simulate flink checkpointing and timer creation and deletion and
> reproduce the problem:
> {code:cpp}
> //
> // main.cpp
> // reproduce
> //
> // Created by shenjiaqi on 2022/2/8.
> //
> #include <iostream>
> #include <filesystem>
> #include <cstdio>
> #include <cstdlib>
> #include <string>
> #include "rocksdb/utilities/checkpoint.h"
> #include "rocksdb/db.h"
> #include "rocksdb/slice.h"
> #include "rocksdb/options.h"
> using namespace ROCKSDB_NAMESPACE;
> using ROCKSDB_NAMESPACE::DB;
> using ROCKSDB_NAMESPACE::Options;
> using ROCKSDB_NAMESPACE::PinnableSlice;
> using ROCKSDB_NAMESPACE::ReadOptions;
> using ROCKSDB_NAMESPACE::Status;
> using ROCKSDB_NAMESPACE::WriteBatch;
> using ROCKSDB_NAMESPACE::WriteOptions;
> std::string kDBPath = "/Users/shenjiaqi/Workspace/flink/jira/data-test"; //
> need to be reconfigured
> static void createCheckpoint(rocksdb::DB *db, rocksdb::Status &s) {
> std::cout << "create checkpoint" << std::endl;
> std::string chkPath = kDBPath + "-chp";
> assert(chkPath.find("/Users/shenjiaqi/Workspace/flink/jira/") >= 0); //
> just in case
> system(("rm -rf " + chkPath).data()); // use with care.
>
> Checkpoint* checkpoint_ptr;
> s = Checkpoint::Create(db, &checkpoint_ptr);
> assert(s.ok());
>
> s = checkpoint_ptr->CreateCheckpoint(chkPath);
> assert(s.ok());
> }
> int main() {
> DB* db;
> Options options;
> // Optimize RocksDB. This is the easiest way to get RocksDB to perform
> well
> options.IncreaseParallelism();
> options.OptimizeLevelStyleCompaction();
> // create the DB if it's not already present
> options.create_if_missing = true;
> options.info_log_level = DEBUG_LEVEL;
> // options.level_compaction_dynamic_level_bytes = true;
> // open DB
> Status s = DB::Open(options, kDBPath, &db);
> assert(s.ok());
> for (int i = 0; i < 1000; ++i) {
> std::string key = "key" + /* std::to_string((int)rand()); //
> */std::to_string(i);
> std::string value = "value" + std::to_string(i);
> // Put key-value
> s = db->Put(WriteOptions(), key, value);
> assert(s.ok());
> // delete after put
> s = db->Delete(WriteOptions(), key);
> assert(s.ok());
> if (i > 0 && (i % 5) == 0) {
> createCheckpoint(db, s);
> }
> }
>
> createCheckpoint(db, s);
> return 0;
> }
> {code}
> Many log such as: "Moving #407 to level-1 1047 bytes" can be found in LOG of
> rocksdb (not enabled in flink by default).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)