This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 01ca71c3e feat(thirdparty): use official rocksdb (#1048)
01ca71c3e is described below
commit 01ca71c3e9c4d9ac679126cbd9cb2ed3036532a0
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Nov 8 23:50:36 2022 +0800
feat(thirdparty): use official rocksdb (#1048)
---
.licenserc.yaml | 1 +
src/server/pegasus_server_impl.cpp | 5 +
thirdparty/CMakeLists.txt | 8 +-
thirdparty/rocksdb_fix_atomic_flush_0879c240.patch | 213 +++++++++++++++++++++
4 files changed, 224 insertions(+), 3 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 4704ac7ab..e7b302fed 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -87,6 +87,7 @@ header:
- 'thirdparty/fix_libevent_for_macos.patch'
- 'thirdparty/fix_s2_for_aarch64.patch'
- 'thirdparty/fix_thrift_for_cpp11.patch'
+ - 'thirdparty/rocksdb_fix_atomic_flush_0879c240.patch'
# should be empty, or ignore all comment lines
- 'src/utils/test/config-empty.ini'
# The MIT License (MIT), Copyright (c) 2015 Microsoft Corporation
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 189b1e030..29e7abc46 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -3160,6 +3160,11 @@ bool pegasus_server_impl::set_options(
auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path,
&column_families);
if (!s.ok()) {
LOG_ERROR_PREFIX("rocksdb::DB::ListColumnFamilies failed, error = {}",
s.ToString());
+ if (s.IsCorruption() &&
+ s.ToString().find("VersionEdit: unknown tag") !=
std::string::npos) {
+ LOG_ERROR_PREFIX("there are some unknown tags in MANIFEST, make
sure you are upgrade "
+ "from Pegasus 2.1 or higher version");
+ }
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
diff --git a/thirdparty/CMakeLists.txt b/thirdparty/CMakeLists.txt
index 6b742efd2..3dd125d3c 100644
--- a/thirdparty/CMakeLists.txt
+++ b/thirdparty/CMakeLists.txt
@@ -390,10 +390,12 @@ ExternalProject_Add(jemalloc
option(ROCKSDB_PORTABLE "build a portable binary" OFF)
+# The patch name '0879c240' means the patch of rocksdb:
+#
https://github.com/facebook/rocksdb/commit/0879c240404b00142ba4718f36cd3f2bd537192d
ExternalProject_Add(rocksdb
- URL
${OSS_URL_PREFIX}/pegasus-rocksdb-ef29819c7a1ea9334ae170f506b653757f517a52.zip
-
https://github.com/XiaoMi/pegasus-rocksdb/archive/ef29819c7a1ea9334ae170f506b653757f517a52.zip
- URL_MD5 2a6488cd87c37e0c2e42a5ca74bebc87
+ URL https://github.com/facebook/rocksdb/archive/refs/tags/v6.6.4.tar.gz
+ URL_MD5 7f7fcca3e96b7d83ef332804c90070c8
+ PATCH_COMMAND patch -p1 <
${TP_DIR}/rocksdb_fix_atomic_flush_0879c240.patch
DEPENDS jemalloc
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${TP_OUTPUT}
-DFAIL_ON_WARNINGS=OFF
diff --git a/thirdparty/rocksdb_fix_atomic_flush_0879c240.patch
b/thirdparty/rocksdb_fix_atomic_flush_0879c240.patch
new file mode 100644
index 000000000..9f90da13e
--- /dev/null
+++ b/thirdparty/rocksdb_fix_atomic_flush_0879c240.patch
@@ -0,0 +1,213 @@
+diff --git a/db/db_impl/db_impl_compaction_flush.cc
b/db/db_impl/db_impl_compaction_flush.cc
+index f399b97d1..04760f832 100644
+--- a/db/db_impl/db_impl_compaction_flush.cc
++++ b/db/db_impl/db_impl_compaction_flush.cc
+@@ -466,6 +466,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
+ autovector<const autovector<MemTable*>*> mems_list;
+ autovector<const MutableCFOptions*> mutable_cf_options_list;
+ autovector<FileMetaData*> tmp_file_meta;
++ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
++ committed_flush_jobs_info;
+ for (int i = 0; i != num_cfs; ++i) {
+ const auto& mems = jobs[i]->GetMemTables();
+ if (!cfds[i]->IsDropped() && !mems.empty()) {
+@@ -473,13 +475,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
+ mems_list.emplace_back(&mems);
+ mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
+ tmp_file_meta.emplace_back(&file_meta[i]);
++#ifndef ROCKSDB_LITE
++ committed_flush_jobs_info.emplace_back(
++ jobs[i]->GetCommittedFlushJobsInfo());
++#endif //! ROCKSDB_LITE
+ }
+ }
+
+ s = InstallMemtableAtomicFlushResults(
+ nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
+ versions_.get(), &mutex_, tmp_file_meta,
+- &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
++ committed_flush_jobs_info, &job_context->memtables_to_free,
++ directories_.GetDbDir(), log_buffer);
+ }
+
+ if (s.ok()) {
+diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc
+index fec737942..d55645696 100644
+--- a/db/flush_job_test.cc
++++ b/db/flush_job_test.cc
+@@ -388,10 +388,18 @@ TEST_F(FlushJobTest,
FlushMemtablesMultipleColumnFamilies) {
+ for (auto cfd : all_cfds) {
+ mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
+ }
++ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
++ committed_flush_jobs_info;
++#ifndef ROCKSDB_LITE
++ for (auto& job : flush_jobs) {
++ committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
++ }
++#endif //! ROCKSDB_LITE
+
+ Status s = InstallMemtableAtomicFlushResults(
+ nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
+- versions_.get(), &mutex_, file_meta_ptrs,
&job_context.memtables_to_free,
++ versions_.get(), &mutex_, file_meta_ptrs,
++ committed_flush_jobs_info, &job_context.memtables_to_free,
+ nullptr /* db_directory */, nullptr /* log_buffer */);
+ ASSERT_OK(s);
+
+diff --git a/db/listener_test.cc b/db/listener_test.cc
+index 0e8bae407..8ba5e0060 100644
+--- a/db/listener_test.cc
++++ b/db/listener_test.cc
+@@ -350,32 +350,38 @@ TEST_F(EventListenerTest, MultiCF) {
+ #ifdef ROCKSDB_USING_THREAD_STATUS
+ options.enable_thread_tracking = true;
+ #endif // ROCKSDB_USING_THREAD_STATUS
+- TestFlushListener* listener = new TestFlushListener(options.env, this);
+- options.listeners.emplace_back(listener);
+- options.table_properties_collector_factories.push_back(
+- std::make_shared<TestPropertiesCollectorFactory>());
+- std::vector<std::string> cf_names = {
+- "pikachu", "ilya", "muromec", "dobrynia",
+- "nikitich", "alyosha", "popovich"};
+- CreateAndReopenWithCF(cf_names, options);
+-
+- ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
+- ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
+- ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
+- ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
+- ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
+- ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
+- ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
+- for (int i = 1; i < 8; ++i) {
+- ASSERT_OK(Flush(i));
+- ASSERT_EQ(listener->flushed_dbs_.size(), i);
+- ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
+- }
++ for (auto atomic_flush : {false, true}) {
++ options.atomic_flush = atomic_flush;
++ options.create_if_missing = true;
++ DestroyAndReopen(options);
++ TestFlushListener* listener = new TestFlushListener(options.env, this);
++ options.listeners.emplace_back(listener);
++ options.table_properties_collector_factories.push_back(
++ std::make_shared<TestPropertiesCollectorFactory>());
++ std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
++ "dobrynia", "nikitich", "alyosha",
++ "popovich"};
++ CreateAndReopenWithCF(cf_names, options);
++
++ ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
++ ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
++ ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
++ ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
++ ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
++ ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
++ ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
++ for (int i = 1; i < 8; ++i) {
++ ASSERT_OK(Flush(i));
++ ASSERT_EQ(listener->flushed_dbs_.size(), i);
++ ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
++ }
+
+- // make sure callback functions are called in the right order
+- for (size_t i = 0; i < cf_names.size(); i++) {
+- ASSERT_EQ(listener->flushed_dbs_[i], db_);
+- ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
++ // make sure callback functions are called in the right order
++ for (size_t i = 0; i < cf_names.size(); i++) {
++ ASSERT_EQ(listener->flushed_dbs_[i], db_);
++ ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
++ }
++ Close();
+ }
+ }
+
+diff --git a/db/memtable_list.cc b/db/memtable_list.cc
+index 3281c96d0..6901e45f4 100644
+--- a/db/memtable_list.cc
++++ b/db/memtable_list.cc
+@@ -641,6 +641,8 @@ Status InstallMemtableAtomicFlushResults(
+ const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+ const autovector<const autovector<MemTable*>*>& mems_list, VersionSet*
vset,
+ InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
++ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, Directory* db_directory,
+ LogBuffer* log_buffer) {
+ AutoThreadOperationStageUpdater stage_updater(
+@@ -666,6 +668,17 @@ Status InstallMemtableAtomicFlushResults(
+ (*mems_list[k])[i]->SetFlushCompleted(true);
+ (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
+ }
++#ifndef ROCKSDB_LITE
++ if (committed_flush_jobs_info[k]) {
++ assert(!mems_list[k]->empty());
++ assert((*mems_list[k])[0]);
++ std::unique_ptr<FlushJobInfo> flush_job_info =
++ (*mems_list[k])[0]->ReleaseFlushJobInfo();
++ committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
++ }
++#else //! ROCKSDB_LITE
++ (void)committed_flush_jobs_info;
++#endif // ROCKSDB_LITE
+ }
+
+ Status s;
+diff --git a/db/memtable_list.h b/db/memtable_list.h
+index cbf42ac08..2bd605089 100644
+--- a/db/memtable_list.h
++++ b/db/memtable_list.h
+@@ -137,6 +137,8 @@ class MemTableListVersion {
+ const autovector<const autovector<MemTable*>*>& mems_list,
+ VersionSet* vset, InstrumentedMutex* mu,
+ const autovector<FileMetaData*>& file_meta,
++ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, Directory* db_directory,
+ LogBuffer* log_buffer);
+
+@@ -375,6 +377,8 @@ class MemTableList {
+ const autovector<const autovector<MemTable*>*>& mems_list,
+ VersionSet* vset, InstrumentedMutex* mu,
+ const autovector<FileMetaData*>& file_meta,
++ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, Directory* db_directory,
+ LogBuffer* log_buffer);
+
+@@ -417,6 +421,8 @@ extern Status InstallMemtableAtomicFlushResults(
+ const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+ const autovector<const autovector<MemTable*>*>& mems_list, VersionSet*
vset,
+ InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
++ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, Directory* db_directory,
+ LogBuffer* log_buffer);
+ } // namespace rocksdb
+diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc
+index 32a227f4b..b88d37997 100644
+--- a/db/memtable_list_test.cc
++++ b/db/memtable_list_test.cc
+@@ -175,11 +175,20 @@ class MemTableListTest : public testing::Test {
+ for (auto& meta : file_metas) {
+ file_meta_ptrs.push_back(&meta);
+ }
++ std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
++ committed_flush_jobs_info_storage(cf_ids.size());
++ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
++ committed_flush_jobs_info;
++ for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
++ committed_flush_jobs_info.push_back(
++ &committed_flush_jobs_info_storage[i]);
++ }
++
+ InstrumentedMutex mutex;
+ InstrumentedMutexLock l(&mutex);
+ return InstallMemtableAtomicFlushResults(
+ &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
+- file_meta_ptrs, to_delete, nullptr, &log_buffer);
++ file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr,
&log_buffer);
+ }
+ };
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]