[
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
shen updated FLINK-26050:
-------------------------
Description:
When using processing time window, in some workload, there will be a lot of
small sst files(serveral KB) in rocksdb local directory and may cause "Too many
files error".
Use rocksdb tool ldb to find out content in sst files:
* column family of these small sst files is "processing_window-timers".
* most sst files are in level-1.
* records in sst files are almost kTypeDeletion.
* creation time of sst file correspond to checkpoint interval.
These small sst files seem to be generated when flink checkpoint is triggered.
Although all content in sst are delete tags, they are not compacted and deleted
in rocksdb compaction because of not intersecting with each other(rocksdb
[compaction trivial
move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And
there seems to be no chance to delete them because of small size and not
intersect with other sst files.
I will attach a simple program to reproduce the problem.
Since timer in processing time window is generated in strictly ascending
order(both put and delete). So If workload of job happen to generate level-0
sst files not intersect with each other.(for example: processing window size
much smaller than checkpoint interval, and no window content cross checkpoint
interval or no new data in window crossing checkpoint interval). There will be
many small sst files generated until job restored from savepoint, or
incremental checkpoint is disabled.
May be similar problem exists when user use timer in operators with same
workload.
Code to reproduce the problem:
{code:java}
package org.apache.flink.jira;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@Slf4j
public class StreamApp {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(RestOptions.ADDRESS, "127.0.0.1");
config.set(RestOptions.PORT, 10086);
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
new
StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1,
config));
}
public void configureApp(StreamExecutionEnvironment env) throws Exception {
env.enableCheckpointing(20000); // 20sec
RocksDBStateBackend rocksDBStateBackend =
new
RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
true); // need to be reconfigured
rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
// need to be reconfigured
env.setStateBackend(rocksDBStateBackend);
env.getCheckpointConfig().setCheckpointTimeout(100000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setTaskCancellationInterval(10000);
for (int i = 0; i < 1; ++i) {
createOnePipeline(env);
}
env.execute("StreamApp");
}
private void createOnePipeline(StreamExecutionEnvironment env) {
// data source is configured so that little window cross checkpoint interval
DataStreamSource<String> stream = env.addSource(new Generator(1, 3000,
3600));
stream.keyBy(x -> x)
// make sure window size less than checkpoint interval. though 100ms is
too small, I think increase this value can still reproduce the problem with
longer time.
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new ProcessWindowFunction<String, String, String,
TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String,
String, TimeWindow>.Context context,
Iterable<String> elements, Collector<String> out) {
for (String ele: elements) {
out.collect(ele);
}
}
}).print();
}
public static final class Generator implements SourceFunction<String>,
ListCheckpointed<Integer> {
private static final long serialVersionUID = -2819385275681175792L;
private final int numKeys;
private final int idlenessMs;
private final int recordsToEmit;
private volatile int numRecordsEmitted = 0;
private volatile boolean canceled = false;
Generator(final int numKeys, final int idlenessMs, final int
durationSeconds) {
this.numKeys = numKeys;
this.idlenessMs = idlenessMs;
this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
}
@Override
public void run(final SourceContext<String> ctx) throws Exception {
Random rnd = new Random();
while (numRecordsEmitted < recordsToEmit) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect("" + rnd.nextInt(1));
numRecordsEmitted++;
}
}
Thread.sleep(idlenessMs);
}
while (!canceled) {
Thread.sleep(50);
}
}
@Override
public void cancel() {
canceled = true;
}
@Override
public List<Integer> snapshotState(final long checkpointId, final long
timestamp) {
return Collections.singletonList(numRecordsEmitted);
}
@Override
public void restoreState(final List<Integer> states) {
for (final Integer state : states) {
numRecordsEmitted += state;
}
}
}
}
{code}
Code to simulate flink checkpointing and timer creation and deletion and
reproduce the problem:
{code:cpp}
//
// main.cpp
// reproduce
//
// Created by shenjiaqi on 2022/2/8.
//
#include <iostream>
#include <filesystem>
#include <cstdio>
#include <cstdlib>
#include <string>
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
using namespace ROCKSDB_NAMESPACE;
using ROCKSDB_NAMESPACE::DB;
using ROCKSDB_NAMESPACE::Options;
using ROCKSDB_NAMESPACE::PinnableSlice;
using ROCKSDB_NAMESPACE::ReadOptions;
using ROCKSDB_NAMESPACE::Status;
using ROCKSDB_NAMESPACE::WriteBatch;
using ROCKSDB_NAMESPACE::WriteOptions;
std::string kDBPath = "/Users/shenjiaqi/Workspace/flink/jira/data-test"; //
need to be reconfigured
static void createCheckpoint(rocksdb::DB *db, rocksdb::Status &s) {
std::cout << "create checkpoint" << std::endl;
std::string chkPath = kDBPath + "-chp";
assert(chkPath.find("/Users/shenjiaqi/Workspace/flink/jira/") >= 0); //
just in case
system(("rm -rf " + chkPath).data()); // use with care.
Checkpoint* checkpoint_ptr;
s = Checkpoint::Create(db, &checkpoint_ptr);
assert(s.ok());
s = checkpoint_ptr->CreateCheckpoint(chkPath);
assert(s.ok());
}
int main() {
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.info_log_level = DEBUG_LEVEL;
// options.level_compaction_dynamic_level_bytes = true;
// open DB
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());
for (int i = 0; i < 1000; ++i) {
std::string key = "key" + /* std::to_string((int)rand()); //
*/std::to_string(i);
std::string value = "value" + std::to_string(i);
// Put key-value
s = db->Put(WriteOptions(), key, value);
assert(s.ok());
// delete after put
s = db->Delete(WriteOptions(), key);
assert(s.ok());
if (i > 0 && (i % 5) == 0) {
createCheckpoint(db, s);
}
}
createCheckpoint(db, s);
return 0;
}
{code}
Many log such as: "Moving #407 to level-1 1047 bytes" can be found in LOG of
rocksdb (not enabled in flink by default).
was:
When using processing time window, in some workload, there will be a lot of
small sst files(serveral KB) in rocksdb local directory and may cause "Too many
files error".
Use rocksdb tool ldb to find out content in sst files:
* column family of these small sst files is "processing_window-timers".
* most sst files are in level-1.
* records in sst files are almost kTypeDeletion.
* creation time of sst file correspond to checkpoint interval.
These small sst files seem to be generated when flink checkpoint is triggered.
Although all content in sst are delete tags, they are not compacted and deleted
in rocksdb compaction because of not intersecting with each other(rocksdb
[compaction trivial
move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And
there seems to be no chance to delete them because of small size and not
intersect with other sst files.
I will attach a simple program to reproduce the problem.
Since timer in processing time window is generated in strictly ascending
order(both put and delete). So If workload of job happen to generate level-0
sst files not intersect with each other.(for example: processing window size
much smaller than checkpoint interval, and no window content cross checkpoint
interval or no new data in window crossing checkpoint interval). There will be
many small sst files generated until job restored from savepoint, or
incremental checkpoint is disabled.
May be similar problem exists when user use timer in operators with same
workload.
Code to reproduce the problem:
{code:java}
package org.apache.flink;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@Slf4j
public class StreamApp {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(RestOptions.ADDRESS, "127.0.0.1");
config.set(RestOptions.PORT, 10086);
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
new
StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1,
config));
}
public void configureApp(StreamExecutionEnvironment env) throws Exception {
env.enableCheckpointing(20000); // 20sec
RocksDBStateBackend rocksDBStateBackend =
new
RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
true); // need to be reconfigured
rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
// need to be reconfigured
env.setStateBackend(rocksDBStateBackend);
env.getCheckpointConfig().setCheckpointTimeout(100000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setTaskCancellationInterval(10000);
for (int i = 0; i < 1; ++i) {
createOnePipeline(env);
}
env.execute("StreamApp");
}
private void createOnePipeline(StreamExecutionEnvironment env) {
// data source is configured so that little window cross checkpoint interval
DataStreamSource<String> stream = env.addSource(new Generator(1, 3000,
3600));
stream.keyBy(x -> x)
// make sure window size less than checkpoint interval. though 100ms is
too small, I think increase this value can still reproduce the problem with
longer time.
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new ProcessWindowFunction<String, String, String,
TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String,
String, TimeWindow>.Context context,
Iterable<String> elements, Collector<String> out) {
for (String ele: elements) {
out.collect(ele);
}
}
}).print();
}
public static final class Generator implements SourceFunction<String>,
ListCheckpointed<Integer> {
private static final long serialVersionUID = -2819385275681175792L;
private final int numKeys;
private final int idlenessMs;
private final int recordsToEmit;
private volatile int numRecordsEmitted = 0;
private volatile boolean canceled = false;
Generator(final int numKeys, final int idlenessMs, final int
durationSeconds) {
this.numKeys = numKeys;
this.idlenessMs = idlenessMs;
this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
}
@Override
public void run(final SourceContext<String> ctx) throws Exception {
Random rnd = new Random();
while (numRecordsEmitted < recordsToEmit) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect("" + rnd.nextInt(1));
numRecordsEmitted++;
}
}
Thread.sleep(idlenessMs);
}
while (!canceled) {
Thread.sleep(50);
}
}
@Override
public void cancel() {
canceled = true;
}
@Override
public List<Integer> snapshotState(final long checkpointId, final long
timestamp) {
return Collections.singletonList(numRecordsEmitted);
}
@Override
public void restoreState(final List<Integer> states) {
for (final Integer state : states) {
numRecordsEmitted += state;
}
}
}
}
{code}
Code to simulate flink checkpointing and timer creation and deletion and
reproduce the problem:
{code:cpp}
//
// main.cpp
// reproduce
//
// Created by shenjiaqi on 2022/2/8.
//
#include <iostream>
#include <filesystem>
#include <cstdio>
#include <cstdlib>
#include <string>
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
using namespace ROCKSDB_NAMESPACE;
using ROCKSDB_NAMESPACE::DB;
using ROCKSDB_NAMESPACE::Options;
using ROCKSDB_NAMESPACE::PinnableSlice;
using ROCKSDB_NAMESPACE::ReadOptions;
using ROCKSDB_NAMESPACE::Status;
using ROCKSDB_NAMESPACE::WriteBatch;
using ROCKSDB_NAMESPACE::WriteOptions;
std::string kDBPath = "/Users/shenjiaqi/Workspace/flink/jira/data-test"; //
need to be reconfigured
static void createCheckpoint(rocksdb::DB *db, rocksdb::Status &s) {
std::cout << "create checkpoint" << std::endl;
std::string chkPath = kDBPath + "-chp";
assert(chkPath.find("/Users/shenjiaqi/Workspace/flink/jira/") >= 0); //
just in case
system(("rm -rf " + chkPath).data()); // use with care.
Checkpoint* checkpoint_ptr;
s = Checkpoint::Create(db, &checkpoint_ptr);
assert(s.ok());
s = checkpoint_ptr->CreateCheckpoint(chkPath);
assert(s.ok());
}
int main() {
DB* db;
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.info_log_level = DEBUG_LEVEL;
// options.level_compaction_dynamic_level_bytes = true;
// open DB
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());
for (int i = 0; i < 1000; ++i) {
std::string key = "key" + /* std::to_string((int)rand()); //
*/std::to_string(i);
std::string value = "value" + std::to_string(i);
// Put key-value
s = db->Put(WriteOptions(), key, value);
assert(s.ok());
// delete after put
s = db->Delete(WriteOptions(), key);
assert(s.ok());
if (i > 0 && (i % 5) == 0) {
createCheckpoint(db, s);
}
}
createCheckpoint(db, s);
return 0;
}
{code}
Many log such as: "Moving #407 to level-1 1047 bytes" can be found in LOG of
rocksdb (not enabled in flink by default).
> Too many small sst files in rocksdb state backend when using processing time
> window
> -----------------------------------------------------------------------------------
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.10.2, 1.14.3
> Reporter: shen
> Priority: Major
>
> When using processing time window, in some workload, there will be a lot of
> small sst files(serveral KB) in rocksdb local directory and may cause "Too
> many files error".
> Use rocksdb tool ldb to find out content in sst files:
> * column family of these small sst files is "processing_window-timers".
> * most sst files are in level-1.
> * records in sst files are almost kTypeDeletion.
> * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is
> triggered. Although all content in sst are delete tags, they are not
> compacted and deleted in rocksdb compaction because of not intersecting with
> each other(rocksdb [compaction trivial
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And
> there seems to be no chance to delete them because of small size and not
> intersect with other sst files.
>
> I will attach a simple program to reproduce the problem.
>
> Since timer in processing time window is generated in strictly ascending
> order(both put and delete). So If workload of job happen to generate level-0
> sst files not intersect with each other.(for example: processing window size
> much smaller than checkpoint interval, and no window content cross checkpoint
> interval or no new data in window crossing checkpoint interval). There will
> be many small sst files generated until job restored from savepoint, or
> incremental checkpoint is disabled.
>
> May be similar problem exists when user use timer in operators with same
> workload.
>
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp {
> public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1,
> config));
> }
> public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(20000); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
> true); // need to be reconfigured
>
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
> // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(100000);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setTaskCancellationInterval(10000);
> for (int i = 0; i < 1; ++i) {
> createOnePipeline(env);
> }
> env.execute("StreamApp");
> }
> private void createOnePipeline(StreamExecutionEnvironment env) {
> // data source is configured so that little window cross checkpoint
> interval
> DataStreamSource<String> stream = env.addSource(new Generator(1, 3000,
> 3600));
> stream.keyBy(x -> x)
> // make sure window size less than checkpoint interval. though 100ms
> is too small, I think increase this value can still reproduce the problem
> with longer time.
> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
> .process(new ProcessWindowFunction<String, String, String,
> TimeWindow>() {
> @Override
> public void process(String s, ProcessWindowFunction<String, String,
> String, TimeWindow>.Context context,
> Iterable<String> elements, Collector<String> out) {
> for (String ele: elements) {
> out.collect(ele);
> }
> }
> }).print();
> }
> public static final class Generator implements SourceFunction<String>,
> ListCheckpointed<Integer> {
> private static final long serialVersionUID = -2819385275681175792L;
> private final int numKeys;
> private final int idlenessMs;
> private final int recordsToEmit;
> private volatile int numRecordsEmitted = 0;
> private volatile boolean canceled = false;
> Generator(final int numKeys, final int idlenessMs, final int
> durationSeconds) {
> this.numKeys = numKeys;
> this.idlenessMs = idlenessMs;
> this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
> }
> @Override
> public void run(final SourceContext<String> ctx) throws Exception {
> Random rnd = new Random();
> while (numRecordsEmitted < recordsToEmit) {
> synchronized (ctx.getCheckpointLock()) {
> for (int i = 0; i < numKeys; i++) {
> ctx.collect("" + rnd.nextInt(1));
> numRecordsEmitted++;
> }
> }
> Thread.sleep(idlenessMs);
> }
> while (!canceled) {
> Thread.sleep(50);
> }
> }
> @Override
> public void cancel() {
> canceled = true;
> }
> @Override
> public List<Integer> snapshotState(final long checkpointId, final long
> timestamp) {
> return Collections.singletonList(numRecordsEmitted);
> }
> @Override
> public void restoreState(final List<Integer> states) {
> for (final Integer state : states) {
> numRecordsEmitted += state;
> }
> }
> }
> }
> {code}
>
> Code to simulate flink checkpointing and timer creation and deletion and
> reproduce the problem:
> {code:cpp}
> //
> // main.cpp
> // reproduce
> //
> // Created by shenjiaqi on 2022/2/8.
> //
> #include <iostream>
> #include <filesystem>
> #include <cstdio>
> #include <cstdlib>
> #include <string>
> #include "rocksdb/utilities/checkpoint.h"
> #include "rocksdb/db.h"
> #include "rocksdb/slice.h"
> #include "rocksdb/options.h"
> using namespace ROCKSDB_NAMESPACE;
> using ROCKSDB_NAMESPACE::DB;
> using ROCKSDB_NAMESPACE::Options;
> using ROCKSDB_NAMESPACE::PinnableSlice;
> using ROCKSDB_NAMESPACE::ReadOptions;
> using ROCKSDB_NAMESPACE::Status;
> using ROCKSDB_NAMESPACE::WriteBatch;
> using ROCKSDB_NAMESPACE::WriteOptions;
> std::string kDBPath = "/Users/shenjiaqi/Workspace/flink/jira/data-test"; //
> need to be reconfigured
> static void createCheckpoint(rocksdb::DB *db, rocksdb::Status &s) {
> std::cout << "create checkpoint" << std::endl;
> std::string chkPath = kDBPath + "-chp";
> assert(chkPath.find("/Users/shenjiaqi/Workspace/flink/jira/") >= 0); //
> just in case
> system(("rm -rf " + chkPath).data()); // use with care.
>
> Checkpoint* checkpoint_ptr;
> s = Checkpoint::Create(db, &checkpoint_ptr);
> assert(s.ok());
>
> s = checkpoint_ptr->CreateCheckpoint(chkPath);
> assert(s.ok());
> }
> int main() {
> DB* db;
> Options options;
> // Optimize RocksDB. This is the easiest way to get RocksDB to perform
> well
> options.IncreaseParallelism();
> options.OptimizeLevelStyleCompaction();
> // create the DB if it's not already present
> options.create_if_missing = true;
> options.info_log_level = DEBUG_LEVEL;
> // options.level_compaction_dynamic_level_bytes = true;
> // open DB
> Status s = DB::Open(options, kDBPath, &db);
> assert(s.ok());
> for (int i = 0; i < 1000; ++i) {
> std::string key = "key" + /* std::to_string((int)rand()); //
> */std::to_string(i);
> std::string value = "value" + std::to_string(i);
> // Put key-value
> s = db->Put(WriteOptions(), key, value);
> assert(s.ok());
> // delete after put
> s = db->Delete(WriteOptions(), key);
> assert(s.ok());
> if (i > 0 && (i % 5) == 0) {
> createCheckpoint(db, s);
> }
> }
>
> createCheckpoint(db, s);
> return 0;
> }
> {code}
> Many log such as: "Moving #407 to level-1 1047 bytes" can be found in LOG of
> rocksdb (not enabled in flink by default).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)