This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b0519d29efa [fix](inverted index) Writing to the inverted index also
writes to the file cache. (#39076)
b0519d29efa is described below
commit b0519d29efa43801ae5dd5dfa1e3849632bf731a
Author: zzzxl <[email protected]>
AuthorDate: Tue Aug 13 16:38:55 2024 +0800
[fix](inverted index) Writing to the inverted index also writes to the file
cache. (#39076)
1. When write_file_cache is true, writing to the inverted index also
writes to the file cache.
---
be/src/olap/compaction.cpp | 3 +
be/src/olap/rowset/beta_rowset_writer.cpp | 8 +-
be/src/olap/rowset/rowset_writer_context.h | 10 ++
.../segment_v2/inverted_index_file_writer.cpp | 3 +
.../rowset/segment_v2/inverted_index_file_writer.h | 3 +
.../segment_v2/inverted_index_fs_directory.cpp | 18 +++-
.../segment_v2/inverted_index_fs_directory.h | 9 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 2 +
.../rowset/segment_v2/vertical_segment_writer.cpp | 2 +
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 9 +-
.../test_index_writer_file_cache.groovy | 116 +++++++++++++++++++++
11 files changed, 162 insertions(+), 21 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8c109eec1c1..9ed27bad382 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -686,6 +686,9 @@ Status Compaction::do_inverted_index_compaction() {
<< st;
return st;
}
+ for (const auto& writer : inverted_index_file_writers) {
+ writer->set_file_writer_opts(ctx.get_file_writer_options());
+ }
}
// use tmp file dir to store index files
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index f3a0ade24f3..ec1bba7621b 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -846,13 +846,7 @@ Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr&
rowset_ptr) {
Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
io::FileWriterPtr&
file_writer) {
- io::FileWriterOptions opts {
- .write_file_cache = _context.write_file_cache,
- .is_cold_data = _context.is_hot_data,
- .file_cache_expiration =
- _context.file_cache_ttl_sec > 0 &&
_context.newest_write_timestamp > 0
- ? _context.newest_write_timestamp +
_context.file_cache_ttl_sec
- : 0};
+ io::FileWriterOptions opts = _context.get_file_writer_options();
Status st = _context.fs()->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ",
err: " << st;
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 0130916bfb4..e13f7efe6e9 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -140,6 +140,16 @@ struct RowsetWriterContext {
return *storage_resource->fs;
}
}
+
+ io::FileWriterOptions get_file_writer_options() const {
+ io::FileWriterOptions opts {
+ .write_file_cache = write_file_cache,
+ .is_cold_data = is_hot_data,
+ .file_cache_expiration = file_cache_ttl_sec > 0 &&
newest_write_timestamp > 0
+ ? newest_write_timestamp +
file_cache_ttl_sec
+ : 0};
+ return opts;
+ }
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index f2ac0e92265..6eb54878924 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -283,6 +283,7 @@ size_t InvertedIndexFileWriter::write_v1() {
ram_dir.close();
auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
idx_path.c_str());
+ out_dir->set_file_writer_opts(_opts);
auto* out = out_dir->createOutput(idx_name.c_str());
if (out == nullptr) {
@@ -348,6 +349,8 @@ size_t InvertedIndexFileWriter::write_v2() {
io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
index_path.parent_path().c_str());
+ out_dir->set_file_writer_opts(_opts);
+
std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
// idx v2 writer != nullptr means memtable on sink node now
if (_idx_v2_writer != nullptr) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index b9f9b983e44..024c1dec986 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -71,6 +71,8 @@ public:
lucene::store::IndexOutput* output, uint8_t* buffer, int64_t
bufferLength);
InvertedIndexStorageFormatPB get_storage_format() const { return
_storage_format; }
+ void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts =
opts; }
+
private:
InvertedIndexDirectoryMap _indices_dirs;
const io::FileSystemSPtr _fs;
@@ -81,6 +83,7 @@ private:
size_t _file_size = 0;
// write to disk or stream
io::FileWriterPtr _idx_v2_writer;
+ io::FileWriterOptions _opts;
};
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 0443bf345ba..27e03b43da2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -84,9 +84,6 @@ namespace doris::segment_v2 {
const char* const DorisFSDirectory::WRITE_LOCK_FILE = "write.lock";
class DorisFSDirectory::FSIndexOutput : public
lucene::store::BufferedIndexOutput {
-private:
- io::FileWriterPtr _writer;
-
protected:
void flushBuffer(const uint8_t* b, const int32_t size) override;
@@ -96,6 +93,12 @@ public:
~FSIndexOutput() override;
void close() override;
int64_t length() const override;
+
+ void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts =
opts; }
+
+private:
+ io::FileWriterPtr _writer;
+ io::FileWriterOptions _opts;
};
class DorisFSDirectory::FSIndexOutputV2 : public
lucene::store::BufferedIndexOutput {
@@ -242,7 +245,13 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t*
b, const int32_t len)
}
void DorisFSDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fs, const
char* path) {
- Status status = fs->create_file(path, &_writer);
+ DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::init.file_cache", {
+ if (fs->type() == io::FileSystemType::S3 && _opts.write_file_cache ==
false) {
+ _CLTHROWA(CL_ERR_IO, "Inverted index failed to enter file cache");
+ }
+ });
+
+ Status status = fs->create_file(path, &_writer, &_opts);
DBUG_EXECUTE_IF(
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
"init",
@@ -579,6 +588,7 @@ lucene::store::IndexOutput*
DorisFSDirectory::createOutput(const char* name) {
assert(!exists);
}
auto* ret = _CLNEW FSIndexOutput();
+ ret->set_file_writer_opts(_opts);
try {
ret->init(_fs, fl);
} catch (CLuceneError& err) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index b3e0352d7ad..357ac65c678 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -29,6 +29,7 @@
#include "CLucene/SharedHeader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
#include "io/io_common.h"
class CLuceneError;
@@ -46,8 +47,6 @@ class CLUCENE_EXPORT DorisFSDirectory : public
lucene::store::Directory {
public:
static const char* const WRITE_LOCK_FILE;
static const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
-private:
- int filemode;
protected:
mutable std::mutex _this_lock;
@@ -91,6 +90,12 @@ public:
virtual void init(const io::FileSystemSPtr& fs, const char* path,
lucene::store::LockFactory* lock_factory = nullptr);
+
+ void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts =
opts; }
+
+private:
+ int32_t filemode;
+ io::FileWriterOptions _opts;
};
class CLUCENE_EXPORT DorisRAMFSDirectory : public DorisFSDirectory {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 36b200fe8e3..f20af3df80a 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -139,6 +139,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer,
uint32_t segment_id,
_opts.rowset_ctx->rowset_id.to_string(), segment_id,
_tablet_schema->get_inverted_index_storage_format(),
std::move(inverted_file_writer));
+ _inverted_index_file_writer->set_file_writer_opts(
+ _opts.rowset_ctx->get_file_writer_options());
}
}
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index aa9376a8d78..3e23b1fda52 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -120,6 +120,8 @@
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
_opts.rowset_ctx->rowset_id.to_string(), segment_id,
_tablet_schema->get_inverted_index_storage_format(),
std::move(inverted_file_writer));
+ _inverted_index_file_writer->set_file_writer_opts(
+ _opts.rowset_ctx->get_file_writer_options());
}
}
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 1db74843697..ee687d18edc 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -165,14 +165,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer(
int seg_id = this->_num_segment.fetch_add(1, std::memory_order_relaxed);
io::FileWriterPtr file_writer;
- io::FileWriterOptions opts {
- .write_file_cache = this->_context.write_file_cache,
- .is_cold_data = this->_context.is_hot_data,
- .file_cache_expiration = this->_context.file_cache_ttl_sec > 0 &&
-
this->_context.newest_write_timestamp > 0
- ?
this->_context.newest_write_timestamp +
-
this->_context.file_cache_ttl_sec
- : 0};
+ io::FileWriterOptions opts = this->_context.get_file_writer_options();
auto path = context.segment_path(seg_id);
auto& fs = context.fs_ref();
diff --git
a/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy
b/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy
new file mode 100644
index 00000000000..b26794e3671
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_index_writer_file_cache_fault_injection", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return;
+ }
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def testTable1 = "test_index_writer_file_cache_fault_injection_1"
+ def testTable2 = "test_index_writer_file_cache_fault_injection_2"
+
+ sql "DROP TABLE IF EXISTS ${testTable1}"
+ sql """
+ CREATE TABLE ${testTable1} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` string NULL COMMENT "",
+ `request` string NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"unicode", "support_phrase" = "true") COMMENT '',
+ INDEX status_idx (`status`) USING INVERTED COMMENT '',
+ INDEX size_idx (`size`) USING INVERTED COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${testTable2}"
+ sql """
+ CREATE TABLE ${testTable2} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` string NULL COMMENT "",
+ `request` string NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"unicode", "support_phrase" = "true") COMMENT '',
+ INDEX status_idx (`status`) USING INVERTED COMMENT '',
+ INDEX size_idx (`size`) USING INVERTED COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def insert_and_compaction = { tableName ->
+ sql """ INSERT INTO ${tableName} VALUES (893964617, '40.135.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 200, 24736); """
+ sql """ INSERT INTO ${tableName} VALUES (893964653, '232.0.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 200, 3781); """
+ sql """ INSERT INTO ${tableName} VALUES (893964672, '26.1.0.0', 'GET
/images/hm_bg.jpg HTTP/1.0', 304, 0); """
+
+ def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ def (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ assertEquals("success", compactJson.status.toLowerCase())
+ }
+
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ def (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" + out
+ ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput::init.file_cache")
+
+ insert_and_compaction.call(testTable1);
+ insert_and_compaction.call(testTable2);
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput::init.file_cache")
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]