This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 01ca71c3e feat(thirdparty): use official rocksdb (#1048)
01ca71c3e is described below

commit 01ca71c3e9c4d9ac679126cbd9cb2ed3036532a0
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Nov 8 23:50:36 2022 +0800

    feat(thirdparty): use official rocksdb (#1048)
---
 .licenserc.yaml                                    |   1 +
 src/server/pegasus_server_impl.cpp                 |   5 +
 thirdparty/CMakeLists.txt                          |   8 +-
 thirdparty/rocksdb_fix_atomic_flush_0879c240.patch | 213 +++++++++++++++++++++
 4 files changed, 224 insertions(+), 3 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 4704ac7ab..e7b302fed 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -87,6 +87,7 @@ header:
     - 'thirdparty/fix_libevent_for_macos.patch'
     - 'thirdparty/fix_s2_for_aarch64.patch'
     - 'thirdparty/fix_thrift_for_cpp11.patch'
+    - 'thirdparty/rocksdb_fix_atomic_flush_0879c240.patch'
     # should be empty, or ignore all comment lines
     - 'src/utils/test/config-empty.ini'
     # The MIT License (MIT), Copyright (c) 2015 Microsoft Corporation
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 189b1e030..29e7abc46 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -3160,6 +3160,11 @@ bool pegasus_server_impl::set_options(
     auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, 
&column_families);
     if (!s.ok()) {
         LOG_ERROR_PREFIX("rocksdb::DB::ListColumnFamilies failed, error = {}", 
s.ToString());
+        if (s.IsCorruption() &&
+            s.ToString().find("VersionEdit: unknown tag") != 
std::string::npos) {
+            LOG_ERROR_PREFIX("there are some unknown tags in MANIFEST, make 
sure you are upgrade "
+                             "from Pegasus 2.1 or higher version");
+        }
         return ::dsn::ERR_LOCAL_APP_FAILURE;
     }
 
diff --git a/thirdparty/CMakeLists.txt b/thirdparty/CMakeLists.txt
index 6b742efd2..3dd125d3c 100644
--- a/thirdparty/CMakeLists.txt
+++ b/thirdparty/CMakeLists.txt
@@ -390,10 +390,12 @@ ExternalProject_Add(jemalloc
 
 option(ROCKSDB_PORTABLE "build a portable binary" OFF)
 
+# The patch name '0879c240' means the patch of rocksdb:
+# 
https://github.com/facebook/rocksdb/commit/0879c240404b00142ba4718f36cd3f2bd537192d
 ExternalProject_Add(rocksdb
-        URL 
${OSS_URL_PREFIX}/pegasus-rocksdb-ef29819c7a1ea9334ae170f506b653757f517a52.zip
-        
https://github.com/XiaoMi/pegasus-rocksdb/archive/ef29819c7a1ea9334ae170f506b653757f517a52.zip
-        URL_MD5 2a6488cd87c37e0c2e42a5ca74bebc87
+        URL https://github.com/facebook/rocksdb/archive/refs/tags/v6.6.4.tar.gz
+        URL_MD5 7f7fcca3e96b7d83ef332804c90070c8
+        PATCH_COMMAND patch -p1 < 
${TP_DIR}/rocksdb_fix_atomic_flush_0879c240.patch
         DEPENDS jemalloc
         CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${TP_OUTPUT}
         -DFAIL_ON_WARNINGS=OFF
diff --git a/thirdparty/rocksdb_fix_atomic_flush_0879c240.patch 
b/thirdparty/rocksdb_fix_atomic_flush_0879c240.patch
new file mode 100644
index 000000000..9f90da13e
--- /dev/null
+++ b/thirdparty/rocksdb_fix_atomic_flush_0879c240.patch
@@ -0,0 +1,213 @@
+diff --git a/db/db_impl/db_impl_compaction_flush.cc 
b/db/db_impl/db_impl_compaction_flush.cc
+index f399b97d1..04760f832 100644
+--- a/db/db_impl/db_impl_compaction_flush.cc
++++ b/db/db_impl/db_impl_compaction_flush.cc
+@@ -466,6 +466,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
+     autovector<const autovector<MemTable*>*> mems_list;
+     autovector<const MutableCFOptions*> mutable_cf_options_list;
+     autovector<FileMetaData*> tmp_file_meta;
++    autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
++        committed_flush_jobs_info;
+     for (int i = 0; i != num_cfs; ++i) {
+       const auto& mems = jobs[i]->GetMemTables();
+       if (!cfds[i]->IsDropped() && !mems.empty()) {
+@@ -473,13 +475,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
+         mems_list.emplace_back(&mems);
+         mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
+         tmp_file_meta.emplace_back(&file_meta[i]);
++#ifndef ROCKSDB_LITE
++        committed_flush_jobs_info.emplace_back(
++            jobs[i]->GetCommittedFlushJobsInfo());
++#endif  //! ROCKSDB_LITE
+       }
+     }
+ 
+     s = InstallMemtableAtomicFlushResults(
+         nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
+         versions_.get(), &mutex_, tmp_file_meta,
+-        &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
++        committed_flush_jobs_info, &job_context->memtables_to_free,
++        directories_.GetDbDir(), log_buffer);
+   }
+ 
+   if (s.ok()) {
+diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc
+index fec737942..d55645696 100644
+--- a/db/flush_job_test.cc
++++ b/db/flush_job_test.cc
+@@ -388,10 +388,18 @@ TEST_F(FlushJobTest, 
FlushMemtablesMultipleColumnFamilies) {
+   for (auto cfd : all_cfds) {
+     mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
+   }
++  autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
++      committed_flush_jobs_info;
++#ifndef ROCKSDB_LITE
++  for (auto& job : flush_jobs) {
++    committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
++  }
++#endif  //! ROCKSDB_LITE
+ 
+   Status s = InstallMemtableAtomicFlushResults(
+       nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
+-      versions_.get(), &mutex_, file_meta_ptrs, 
&job_context.memtables_to_free,
++      versions_.get(), &mutex_, file_meta_ptrs,
++      committed_flush_jobs_info, &job_context.memtables_to_free,
+       nullptr /* db_directory */, nullptr /* log_buffer */);
+   ASSERT_OK(s);
+ 
+diff --git a/db/listener_test.cc b/db/listener_test.cc
+index 0e8bae407..8ba5e0060 100644
+--- a/db/listener_test.cc
++++ b/db/listener_test.cc
+@@ -350,32 +350,38 @@ TEST_F(EventListenerTest, MultiCF) {
+ #ifdef ROCKSDB_USING_THREAD_STATUS
+   options.enable_thread_tracking = true;
+ #endif  // ROCKSDB_USING_THREAD_STATUS
+-  TestFlushListener* listener = new TestFlushListener(options.env, this);
+-  options.listeners.emplace_back(listener);
+-  options.table_properties_collector_factories.push_back(
+-      std::make_shared<TestPropertiesCollectorFactory>());
+-  std::vector<std::string> cf_names = {
+-      "pikachu", "ilya", "muromec", "dobrynia",
+-      "nikitich", "alyosha", "popovich"};
+-  CreateAndReopenWithCF(cf_names, options);
+-
+-  ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
+-  ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
+-  ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
+-  ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
+-  ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
+-  ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
+-  ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
+-  for (int i = 1; i < 8; ++i) {
+-    ASSERT_OK(Flush(i));
+-    ASSERT_EQ(listener->flushed_dbs_.size(), i);
+-    ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
+-  }
++  for (auto atomic_flush : {false, true}) {
++    options.atomic_flush = atomic_flush;
++    options.create_if_missing = true;
++    DestroyAndReopen(options);
++    TestFlushListener* listener = new TestFlushListener(options.env, this);
++    options.listeners.emplace_back(listener);
++    options.table_properties_collector_factories.push_back(
++        std::make_shared<TestPropertiesCollectorFactory>());
++    std::vector<std::string> cf_names = {"pikachu",  "ilya",     "muromec",
++                                         "dobrynia", "nikitich", "alyosha",
++                                         "popovich"};
++    CreateAndReopenWithCF(cf_names, options);
++
++    ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
++    ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
++    ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
++    ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
++    ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
++    ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
++    ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
++    for (int i = 1; i < 8; ++i) {
++      ASSERT_OK(Flush(i));
++      ASSERT_EQ(listener->flushed_dbs_.size(), i);
++      ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
++    }
+ 
+-  // make sure callback functions are called in the right order
+-  for (size_t i = 0; i < cf_names.size(); i++) {
+-    ASSERT_EQ(listener->flushed_dbs_[i], db_);
+-    ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
++    // make sure callback functions are called in the right order
++    for (size_t i = 0; i < cf_names.size(); i++) {
++      ASSERT_EQ(listener->flushed_dbs_[i], db_);
++      ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
++    }
++    Close();
+   }
+ }
+ 
+diff --git a/db/memtable_list.cc b/db/memtable_list.cc
+index 3281c96d0..6901e45f4 100644
+--- a/db/memtable_list.cc
++++ b/db/memtable_list.cc
+@@ -641,6 +641,8 @@ Status InstallMemtableAtomicFlushResults(
+     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* 
vset,
+     InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
++    const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++        committed_flush_jobs_info,
+     autovector<MemTable*>* to_delete, Directory* db_directory,
+     LogBuffer* log_buffer) {
+   AutoThreadOperationStageUpdater stage_updater(
+@@ -666,6 +668,17 @@ Status InstallMemtableAtomicFlushResults(
+       (*mems_list[k])[i]->SetFlushCompleted(true);
+       (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
+     }
++#ifndef ROCKSDB_LITE
++    if (committed_flush_jobs_info[k]) {
++      assert(!mems_list[k]->empty());
++      assert((*mems_list[k])[0]);
++      std::unique_ptr<FlushJobInfo> flush_job_info =
++          (*mems_list[k])[0]->ReleaseFlushJobInfo();
++      committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
++    }
++#else   //! ROCKSDB_LITE
++    (void)committed_flush_jobs_info;
++#endif  // ROCKSDB_LITE
+   }
+ 
+   Status s;
+diff --git a/db/memtable_list.h b/db/memtable_list.h
+index cbf42ac08..2bd605089 100644
+--- a/db/memtable_list.h
++++ b/db/memtable_list.h
+@@ -137,6 +137,8 @@ class MemTableListVersion {
+       const autovector<const autovector<MemTable*>*>& mems_list,
+       VersionSet* vset, InstrumentedMutex* mu,
+       const autovector<FileMetaData*>& file_meta,
++      const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++          committed_flush_jobs_info,
+       autovector<MemTable*>* to_delete, Directory* db_directory,
+       LogBuffer* log_buffer);
+ 
+@@ -375,6 +377,8 @@ class MemTableList {
+       const autovector<const autovector<MemTable*>*>& mems_list,
+       VersionSet* vset, InstrumentedMutex* mu,
+       const autovector<FileMetaData*>& file_meta,
++      const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++          committed_flush_jobs_info,
+       autovector<MemTable*>* to_delete, Directory* db_directory,
+       LogBuffer* log_buffer);
+ 
+@@ -417,6 +421,8 @@ extern Status InstallMemtableAtomicFlushResults(
+     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* 
vset,
+     InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
++    const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
++        committed_flush_jobs_info,
+     autovector<MemTable*>* to_delete, Directory* db_directory,
+     LogBuffer* log_buffer);
+ }  // namespace rocksdb
+diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc
+index 32a227f4b..b88d37997 100644
+--- a/db/memtable_list_test.cc
++++ b/db/memtable_list_test.cc
+@@ -175,11 +175,20 @@ class MemTableListTest : public testing::Test {
+     for (auto& meta : file_metas) {
+       file_meta_ptrs.push_back(&meta);
+     }
++    std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
++        committed_flush_jobs_info_storage(cf_ids.size());
++    autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
++        committed_flush_jobs_info;
++    for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
++      committed_flush_jobs_info.push_back(
++          &committed_flush_jobs_info_storage[i]);
++    }
++
+     InstrumentedMutex mutex;
+     InstrumentedMutexLock l(&mutex);
+     return InstallMemtableAtomicFlushResults(
+         &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
+-        file_meta_ptrs, to_delete, nullptr, &log_buffer);
++        file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr, 
&log_buffer);
+   }
+ };
+ 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to