This is an automated email from the ASF dual-hosted git repository.
lide pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7103872fc20 [branch-2.0](recover) support skipping missing version in
select by session variable (#34931)
7103872fc20 is described below
commit 7103872fc20ab67d47deeda2ab571d0e9a1bcc17
Author: xy720 <[email protected]>
AuthorDate: Thu May 16 15:29:38 2024 +0800
[branch-2.0](recover) support skipping missing version in select by session
variable (#34931)
---
be/src/olap/schema_change.cpp | 2 +-
be/src/olap/tablet.cpp | 23 +++++++-----
be/src/olap/tablet.h | 8 +++--
be/src/runtime/runtime_state.h | 4 +++
be/src/vec/exec/scan/new_olap_scan_node.cpp | 2 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 3 +-
be/test/olap/tablet_test.cpp | 4 +--
.../main/java/org/apache/doris/common/Config.java | 14 --------
.../java/org/apache/doris/catalog/Replica.java | 18 ++++++++++
.../main/java/org/apache/doris/catalog/Tablet.java | 4 +--
.../org/apache/doris/planner/OlapScanNode.java | 32 ++++++++++-------
.../java/org/apache/doris/qe/SessionVariable.java | 17 +++++++++
gensrc/thrift/PaloInternalService.thrift | 2 ++
.../session_variable/test_skip_missing_version.out | 4 +++
.../test_skip_missing_version.groovy | 42 ++++++++++++++++++++++
15 files changed, 135 insertions(+), 44 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 4ea1ccbc3ff..80dc6003619 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1004,7 +1004,7 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
*max_rowset = rowset;
RETURN_IF_ERROR(base_tablet->capture_consistent_versions(Version(0,
rowset->version().second),
-
versions_to_be_changed));
+
versions_to_be_changed, false, false));
return Status::OK();
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 51811c2d22d..0d1898a5999 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -758,7 +758,7 @@ void Tablet::delete_expired_stale_rowset() {
Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;
- Status status = capture_consistent_versions(test_version, nullptr);
+ Status status = capture_consistent_versions(test_version, nullptr,
false, false);
// 1. When there is no consistent versions, we must reconstruct
the tracker.
if (!status.ok()) {
// 2. fetch missing version after delete
@@ -882,7 +882,8 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() {
}
Status Tablet::capture_consistent_versions(const Version& spec_version,
- std::vector<Version>* version_path,
bool quiet) const {
+ std::vector<Version>* version_path,
+ bool skip_missing_version, bool
quiet) const {
Status status =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
version_path);
if (!status.ok() && !quiet) {
@@ -905,6 +906,10 @@ Status Tablet::capture_consistent_versions(const Version&
spec_version,
LOG(WARNING) << "status:" << status << ", tablet:" <<
full_name()
<< ", missed version for version:" <<
spec_version;
_print_missed_versions(missed_versions);
+ if (skip_missing_version) {
+ LOG(WARNING) << "force skipping missing version for
tablet:" << full_name();
+ return Status::OK();
+ }
}
}
}
@@ -913,7 +918,7 @@ Status Tablet::capture_consistent_versions(const Version&
spec_version,
Status Tablet::check_version_integrity(const Version& version, bool quiet) {
std::shared_lock rdlock(_meta_lock);
- return capture_consistent_versions(version, nullptr, quiet);
+ return capture_consistent_versions(version, nullptr, false, quiet);
}
bool Tablet::exceed_version_limit(int32_t limit) const {
@@ -946,7 +951,7 @@ void Tablet::acquire_version_and_rowsets(
Status Tablet::capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>*
rowsets) const {
std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path));
+ RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path,
false, false));
RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path,
rowsets));
return Status::OK();
}
@@ -982,10 +987,11 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const
std::vector<Version>&
return Status::OK();
}
-Status Tablet::capture_rs_readers(const Version& spec_version,
- std::vector<RowSetSplits>* rs_splits) const {
+Status Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) const {
std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path));
+ RETURN_IF_ERROR(
+ capture_consistent_versions(spec_version, &version_path,
skip_missing_version, false));
RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits));
return Status::OK();
}
@@ -3722,7 +3728,8 @@ Status Tablet::check_rowid_conversion(
Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const {
// Ensure that the obtained versions of rowsets are continuous
std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version),
&version_path));
+ RETURN_IF_ERROR(
+ capture_consistent_versions(Version(0, max_version),
&version_path, false, false));
for (auto& ver : version_path) {
if (ver.second == 1) {
// [0-1] rowset is empty for each tablet, skip it
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 775bfa9262b..45694e30602 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -176,9 +176,10 @@ public:
// Given spec_version, find a continuous version path and store it in
version_path.
// If quiet is true, then only "does this path exist" is returned.
+ // If skip_missing_version is true, return ok even there are missing
versions.
Status capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path,
- bool quiet = false) const;
+ bool skip_missing_version, bool quiet =
false) const;
// if quiet is true, no error log will be printed if there are missing
versions
Status check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
@@ -187,8 +188,9 @@ public:
Status capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>* rowsets)
const;
- Status capture_rs_readers(const Version& spec_version,
- std::vector<RowSetSplits>* rs_splits) const;
+ // If skip_missing_version is true, skip versions if they are missing.
+ Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) const;
Status capture_rs_readers(const std::vector<Version>& version_path,
std::vector<RowSetSplits>* rs_splits) const;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 8e57638dee6..246d5a54783 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -361,6 +361,10 @@ public:
return _query_options.__isset.skip_delete_bitmap &&
_query_options.skip_delete_bitmap;
}
+ bool skip_missing_version() const {
+ return _query_options.__isset.skip_missing_version &&
_query_options.skip_missing_version;
+ }
+
bool enable_page_cache() const;
int partitioned_hash_join_rows_threshold() const {
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index e442b4b4e4f..4dc56c3f44c 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -479,7 +479,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
auto& read_source = tablets_read_source.emplace_back();
{
std::shared_lock rdlock(tablet->get_header_lock());
- auto st = tablet->capture_rs_readers({0, version},
&read_source.rs_splits);
+ auto st = tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false);
if (!st.ok()) {
LOG(WARNING) << "fail to init reader.res=" << st;
return Status::InternalError(
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index f7e839a89b9..6d9a7964dff 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -186,7 +186,8 @@ Status NewOlapScanner::init() {
ReadSource read_source;
{
std::shared_lock rdlock(_tablet->get_header_lock());
- auto st = _tablet->capture_rs_readers(rd_version,
&read_source.rs_splits);
+ auto st = _tablet->capture_rs_readers(rd_version,
&read_source.rs_splits,
+
_state->skip_missing_version());
if (!st.ok()) {
LOG(WARNING) << "fail to init reader.res=" << st;
return st;
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 951d98b1e05..5889846a49e 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -293,12 +293,12 @@ TEST_F(TestTablet, pad_rowset) {
Version version(5, 5);
std::vector<RowSetSplits> splits;
- ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits).ok());
+ ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits, false).ok());
splits.clear();
PadRowsetAction action(nullptr, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN);
action._pad_rowset(_tablet, version);
- ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits).ok());
+ ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits, false).ok());
}
TEST_F(TestTablet, cooldown_policy) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c1c03754c99..47561c4dd22 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1286,20 +1286,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean recover_with_empty_tablet = false;
- /**
- * In some scenarios, there is an unrecoverable metadata problem in the
cluster,
- * and the visibleVersion of the data does not match be. In this case, it
is still
- * necessary to restore the remaining data (which may cause problems with
the correctness of the data).
- * This configuration is the same as` recover_with_empty_tablet` should
only be used in emergency situations
- * This configuration has three values:
- * disable : If an exception occurs, an error will be reported normally.
- * ignore_version: ignore the visibleVersion information recorded in fe
partition, use replica version
- * ignore_all: In addition to ignore_version, when encountering no
queryable replica,
- * skip it directly instead of throwing an exception
- */
- @ConfField(mutable = true, masterOnly = true)
- public static String recover_with_skip_missing_version = "disable";
-
/**
* Whether to add a delete sign column when create unique table
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index fc4208add56..0c5bf7a54b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -37,6 +37,8 @@ import java.util.Comparator;
public class Replica implements Writable {
private static final Logger LOG = LogManager.getLogger(Replica.class);
public static final VersionComparator<Replica> VERSION_DESC_COMPARATOR =
new VersionComparator<Replica>();
+ public static final LastSuccessVersionComparator<Replica>
LAST_SUCCESS_VERSION_COMPARATOR =
+ new LastSuccessVersionComparator<Replica>();
public static final IdComparator<Replica> ID_COMPARATOR = new
IdComparator<Replica>();
public enum ReplicaState {
@@ -662,6 +664,22 @@ public class Replica implements Writable {
}
}
+ private static class LastSuccessVersionComparator<T extends Replica>
implements Comparator<T> {
+ public LastSuccessVersionComparator() {
+ }
+
+ @Override
+ public int compare(T replica1, T replica2) {
+ if (replica1.getLastSuccessVersion() <
replica2.getLastSuccessVersion()) {
+ return 1;
+ } else if (replica1.getLastSuccessVersion() ==
replica2.getLastSuccessVersion()) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ }
+
private static class IdComparator<T extends Replica> implements
Comparator<T> {
public IdComparator() {
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 7d65b6b95a2..19d159db40b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -238,7 +238,7 @@ public class Tablet extends MetaObject implements Writable {
}
// for query
- public List<Replica> getQueryableReplicas(long visibleVersion) {
+ public List<Replica> getQueryableReplicas(long visibleVersion, boolean
allowFailedVersion) {
List<Replica> allQueryableReplica =
Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica =
Lists.newArrayListWithCapacity(replicas.size());
for (Replica replica : replicas) {
@@ -247,7 +247,7 @@ public class Tablet extends MetaObject implements Writable {
}
// Skip the missing version replica
- if (replica.getLastFailedVersion() > 0) {
+ if (replica.getLastFailedVersion() > 0 && !allowFailedVersion) {
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index f7263980136..edc440e8866 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -726,10 +726,15 @@ public class OlapScanNode extends ScanNode {
String visibleVersionStr = String.valueOf(visibleVersion);
Set<Tag> allowedTags = Sets.newHashSet();
+ int useFixReplica = -1;
boolean needCheckTags = false;
+ boolean skipMissingVersion = false;
if (ConnectContext.get() != null) {
allowedTags = ConnectContext.get().getResourceTags();
needCheckTags = ConnectContext.get().isResourceTagsSet();
+ useFixReplica =
ConnectContext.get().getSessionVariable().useFixReplica;
+ // if use_fix_replica is set to true, set skip_missing_version to
false
+ skipMissingVersion = useFixReplica == -1 &&
ConnectContext.get().getSessionVariable().skipMissingVersion;
if (LOG.isDebugEnabled()) {
LOG.debug("query id: {}, partition id:{} visibleVersion: {}",
DebugUtil.printId(ConnectContext.get().queryId()),
partition.getId(), visibleVersion);
@@ -737,7 +742,7 @@ public class OlapScanNode extends ScanNode {
}
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
- if
(!Config.recover_with_skip_missing_version.equalsIgnoreCase("disable")) {
+ if (skipMissingVersion) {
long tabletVersion = -1L;
for (Replica replica : tablet.getReplicas()) {
if (replica.getVersion() > tabletVersion) {
@@ -760,7 +765,7 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
- List<Replica> replicas =
tablet.getQueryableReplicas(visibleVersion);
+ List<Replica> replicas =
tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
if (replicas.isEmpty()) {
LOG.warn("no queryable replica found in tablet {}. visible
version {}", tabletId, visibleVersion);
StringBuilder sb = new StringBuilder(
@@ -774,12 +779,14 @@ public class OlapScanNode extends ScanNode {
throw new UserException(sb.toString());
}
- int useFixReplica = -1;
- if (ConnectContext.get() != null) {
- useFixReplica =
ConnectContext.get().getSessionVariable().useFixReplica;
- }
if (useFixReplica == -1) {
Collections.shuffle(replicas);
+ if (skipMissingVersion) {
+ // sort by replica's last success version, higher success
version in the front.
+ replicas.sort(Replica.LAST_SUCCESS_VERSION_COMPARATOR);
+ } else {
+ Collections.shuffle(replicas);
+ }
} else {
LOG.debug("use fix replica, value: {}, replica num: {}",
useFixReplica, replicas.size());
// sort by replica id
@@ -849,14 +856,15 @@ public class OlapScanNode extends ScanNode {
collectedStat = true;
}
scanBackendIds.add(backend.getId());
+ // For skipping missing version of tablet, we only select the
backend with the highest last
+ // success version replica to save as much data as possible.
+ if (skipMissingVersion) {
+ break;
+ }
}
if (tabletIsNull) {
- if
(Config.recover_with_skip_missing_version.equalsIgnoreCase("ignore_all")) {
- continue;
- } else {
- throw new UserException(tabletId + " have no queryable
replicas. err: "
- + Joiner.on(", ").join(errs));
- }
+ throw new UserException(tabletId + " have no queryable
replicas. err: "
+ + Joiner.on(", ").join(errs));
}
TScanRange scanRange = new TScanRange();
scanRange.setPaloScanRange(paloRange);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 56743ba04c5..27db5f2a0c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -294,6 +294,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SKIP_DELETE_BITMAP = "skip_delete_bitmap";
+ public static final String SKIP_MISSING_VERSION = "skip_missing_version";
+
public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD =
"enable_new_shuffle_hash_method";
public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG =
"enable_push_down_no_group_agg";
@@ -988,6 +990,19 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = SKIP_DELETE_BITMAP)
public boolean skipDeleteBitmap = false;
+ // This variable replace the original FE config
`recover_with_skip_missing_version`.
+ // In some scenarios, all replicas of tablet are having missing versions,
and the tablet is unable to recover.
+ // This config can control the behavior of query. When it is set to
`true`, the query will ignore the
+ // visible version recorded in FE partition, use the replica version. If
the replica on BE has missing versions,
+ // the query will directly skip this missing version, and only return the
data of the existing versions.
+ // Besides, the query will always try to select the one with the highest
lastSuccessVersion among all surviving
+ // BE replicas, so as to recover as much data as possible.
+ // You should only open it in the emergency scenarios mentioned above,
only used for temporary recovery queries.
+ // This variable conflicts with the use_fix_replica variable, when the
use_fix_replica variable is not -1,
+ // this variable will not work.
+ @VariableMgr.VarAttr(name = SKIP_MISSING_VERSION)
+ public boolean skipMissingVersion = false;
+
// This variable is used to avoid FE fallback to the original parser. When
we execute SQL in regression tests
// for nereids, fallback will cause the Doris return the correct result
although the syntax is unsupported
// in nereids for some mistaken modification. You should set it on the
@@ -2590,6 +2605,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableInvertedIndexCompoundInlist(enableInvertedIndexCompoundInlist);
+ tResult.setSkipMissingVersion(skipMissingVersion);
+
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 22422aeabac..37449a4d638 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -255,6 +255,8 @@ struct TQueryOptions {
89: optional i32 inverted_index_skip_threshold = 50;
90: optional bool enable_inverted_index_compound_inlist = false;
+ // For emergency use, skip missing version when reading rowsets
+ 91: optional bool skip_missing_version = false;
}
diff --git
a/regression-test/data/query_p0/session_variable/test_skip_missing_version.out
b/regression-test/data/query_p0/session_variable/test_skip_missing_version.out
new file mode 100644
index 00000000000..2905460928e
--- /dev/null
+++
b/regression-test/data/query_p0/session_variable/test_skip_missing_version.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_all --
+1000 a 10
+2000 b 10
diff --git
a/regression-test/suites/query_p0/session_variable/test_skip_missing_version.groovy
b/regression-test/suites/query_p0/session_variable/test_skip_missing_version.groovy
new file mode 100644
index 00000000000..859fa3ca680
--- /dev/null
+++
b/regression-test/suites/query_p0/session_variable/test_skip_missing_version.groovy
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_skip_missing_version") {
+ def test_tbl = "test_skip_missing_version_tbl"
+
+ sql """ DROP TABLE IF EXISTS ${test_tbl}"""
+ sql """
+ CREATE TABLE ${test_tbl} (
+ `k1` int(11) NULL,
+ `k2` char(5) NULL,
+ `k3` tinyint(4) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`, `k3`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+
+ sql """ INSERT INTO ${test_tbl} VALUES(1000, 'a', 10); """
+ sql """ INSERT INTO ${test_tbl} VALUES(2000, 'b', 10); """
+
+ // This case cannot verify the results, but it can verify abnormalities
after
+ // SET skip_missing_version=true sql """ SET skip_missing_version=true
"""
+ sql """ SET skip_missing_version=true """
+ qt_select_all """ select * from ${test_tbl} order by k1 """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]