This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit aad6cb57d458b790b6e0dced75f86b6499b39b1c Author: Mingyu Chen <[email protected]> AuthorDate: Wed Mar 9 13:03:22 2022 +0800 [fix](replica) handle replica version missing info to avoid -214 error (#8209) In the original tablet reporting information, the version missing information is done by combining two pieces of information as follows: 1. the maximum consecutive version number 2. the `version_miss` field The logic of this approach is confusing and inconsistent with the logic of checking for missing versions when querying. After the change, we directly use the version checking logic used in the query, and set `version_miss` to true if a missing version is found and on the FE processing side. Originally, only the **bad replica** information was syncronized among FEs, but not the **version missing** information. As a result, the non-master FE is not aware of the missing version information. In the new design, we deprecate the original log persistence class `BackendTabletsInfo` and use the new `BackendReplicasInfo` to record replica reporting information and write both **bad** and **version missing** information to metadata so that other FEs can synchronize these information. --- be/src/olap/tablet.cpp | 23 ++-- be/src/olap/tablet.h | 8 +- .../java/org/apache/doris/catalog/Catalog.java | 33 +++++ .../java/org/apache/doris/catalog/EsTable.java | 3 +- .../java/org/apache/doris/catalog/Replica.java | 58 +++++---- .../apache/doris/catalog/TabletInvertedIndex.java | 10 +- .../doris/external/elasticsearch/EsRestClient.java | 17 ++- .../org/apache/doris/journal/JournalEntity.java | 6 + .../main/java/org/apache/doris/load/LoadJob.java | 145 +++++++++++---------- .../org/apache/doris/master/ReportHandler.java | 42 ++---- .../apache/doris/persist/BackendReplicasInfo.java | 105 +++++++++++++++ .../apache/doris/persist/BackendTabletsInfo.java | 2 + .../java/org/apache/doris/persist/EditLog.java | 10 ++ .../org/apache/doris/persist/OperationType.java | 2 + .../doris/persist/BackendReplicaInfosTest.java | 81 ++++++++++++ 15 files changed, 391 insertions(+), 154 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f9856d7..a6f5648 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -552,10 +552,11 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() { } OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, - std::vector<Version>* version_path) const { + std::vector<Version>* version_path, + bool quiet) const { OLAPStatus status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); - if (status != OLAP_SUCCESS) { + if (status != OLAP_SUCCESS && !quiet) { std::vector<Version> missed_versions; calc_missed_versions_unlocked(spec_version.second, &missed_versions); if (missed_versions.empty()) { @@ -577,9 +578,9 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, return status; } -OLAPStatus Tablet::check_version_integrity(const Version& version) { +OLAPStatus Tablet::check_version_integrity(const Version& version, bool quiet) { ReadLock rdlock(&_meta_lock); - return capture_consistent_versions(version, nullptr); + return capture_consistent_versions(version, nullptr, quiet); } // If any rowset contains the specific version, it means the version already exist @@ -1245,14 +1246,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { tablet_info->schema_hash = _tablet_meta->schema_hash(); tablet_info->row_count = _tablet_meta->num_rows(); tablet_info->data_size = _tablet_meta->tablet_footprint(); - Version version = {-1, 0}; - _max_continuous_version_from_beginning_unlocked(&version); + + tablet_info->__set_version_miss(false); auto max_rowset = rowset_with_max_version(); - if (max_rowset != nullptr) { - if (max_rowset->version() != version) { - tablet_info->__set_version_miss(true); - } - } else { + if (max_rowset == nullptr) { // If the tablet is in running state, it must not be doing schema-change. so if we can not // access its rowsets, it means that the tablet is bad and needs to be reported to the FE // for subsequent repairs (through the cloning task) @@ -1263,8 +1260,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { // still sets the state to normal when reporting. Note that every task has an timeout, // so if the task corresponding to this change hangs, when the task timeout, FE will know // and perform state modification operations. + } else { + tablet_info->__set_version_miss(check_version_integrity({0, max_rowset->version().second}, true)); } - tablet_info->version = version.second; + tablet_info->version = max_rowset->version().second; // Useless but it is a required filed in TTabletInfo tablet_info->version_hash = 0; tablet_info->__set_partition_id(_tablet_meta->partition_id()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 52f58ae..b11fedf 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -115,9 +115,13 @@ public: /// need to delete flag. void delete_expired_stale_rowset(); + // Given spec_version, find a continuous version path and store it in version_path. + // If quiet is true, then only "does this path exist" is returned. OLAPStatus capture_consistent_versions(const Version& spec_version, - std::vector<Version>* version_path) const; - OLAPStatus check_version_integrity(const Version& version); + std::vector<Version>* version_path, + bool quiet = false) const; + // if quiet is true, no error log will be printed if there are missing versions + OLAPStatus check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; void acquire_version_and_rowsets(std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) const; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 45598ab..e2bf25e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -188,6 +188,7 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.BackendIdsUpdateInfo; +import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.ClusterInfo; import org.apache.doris.persist.ColocatePersistInfo; @@ -745,12 +746,15 @@ public class Catalog { public StatisticsManager getStatisticsManager() { return statisticsManager; } + public StatisticsJobManager getStatisticsJobManager() { return statisticsJobManager; } + public StatisticsJobScheduler getStatisticsJobScheduler() { return statisticsJobScheduler; } + public StatisticsTaskScheduler getStatisticsTaskScheduler() { return statisticsTaskScheduler; } @@ -6992,6 +6996,35 @@ public class Catalog { } } + public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) { + long backendId = backendReplicasInfo.getBackendId(); + List<BackendReplicasInfo.ReplicaReportInfo> replicaInfos = backendReplicasInfo.getReplicaReportInfos(); + + for (BackendReplicasInfo.ReplicaReportInfo info : replicaInfos) { + Replica replica = tabletInvertedIndex.getReplica(info.tabletId, backendId); + if (replica == null) { + LOG.warn("failed to find replica of tablet {} on backend {} when replaying backend report info", + info.tabletId, backendId); + continue; + } + + switch (info.type) { + case BAD: + replica.setBad(true); + break; + case MISSING_VERSION: + // The absolute value is meaningless, as long as it is greater than 0. + // This way, in other checking logic, if lastFailedVersion is found to be greater than 0, + // it will be considered a version missing replica and will be handled accordingly. + replica.setLastFailedVersion(1L); + break; + default: + break; + } + } + } + + @Deprecated public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { List<Pair<Long, Integer>> tabletsWithSchemaHash = backendTabletsInfo.getTabletSchemaHash(); if (!tabletsWithSchemaHash.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index cbd2b77..824b54c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -458,7 +458,8 @@ public class EsTable extends Table { esMetaStateTracker.run(); this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions(); } catch (Throwable e) { - LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e); + LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster." + + "table id: {}, err: {}", this.name, this.id, e.getMessage()); this.esTablePartitions = null; this.lastMetaDataSyncException = e; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 5550056..2bd3030 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -56,7 +56,7 @@ public class Replica implements Writable { return this == NORMAL || this == SCHEMA_CHANGE; } } - + public enum ReplicaStatus { OK, // health DEAD, // backend is not available @@ -65,7 +65,7 @@ public class Replica implements Writable { SCHEMA_ERROR, // replica's schema hash does not equal to index's schema hash BAD // replica is broken. } - + @SerializedName(value = "id") private long id; @SerializedName(value = "backendId") @@ -96,7 +96,7 @@ public class Replica implements Writable { @SerializedName(value = "lastSuccessVersionHash") private long lastSuccessVersionHash = 0L; - private volatile long versionCount = -1; + private volatile long versionCount = -1; private long pathHash = -1; @@ -110,7 +110,7 @@ public class Replica implements Writable { * So this replica need a further repair. * If we do not do this, this replica will be treated as version stale, and will be removed, * so that the balance task is failed, which is unexpected. - * + * * furtherRepairSetTime set alone with needFurtherRepair. * This is an insurance, in case that further repair task always fail. If 20 min passed * since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false. @@ -125,13 +125,13 @@ public class Replica implements Writable { public Replica() { } - + // for rollup // the new replica's version is -1 and last failed version is -1 public Replica(long replicaId, long backendId, int schemaHash, ReplicaState state) { this(replicaId, backendId, -1, 0, schemaHash, 0L, 0L, state, -1, 0, -1, 0); } - + // for create tablet and restore public Replica(long replicaId, long backendId, ReplicaState state, long version, long versionHash, int schemaHash) { this(replicaId, backendId, version, versionHash, schemaHash, 0L, 0L, state, -1L, 0L, version, versionHash); @@ -166,7 +166,7 @@ public class Replica implements Writable { this.lastSuccessVersionHash = lastSuccessVersionHash; } } - + public long getVersion() { return this.version; } @@ -191,7 +191,7 @@ public class Replica implements Writable { public long getBackendId() { return this.backendId; } - + public long getDataSize() { return dataSize; } @@ -211,7 +211,7 @@ public class Replica implements Writable { public long getLastFailedTimestamp() { return lastFailedTimestamp; } - + public long getLastSuccessVersion() { return lastSuccessVersion; } @@ -269,7 +269,7 @@ public class Replica implements Writable { updateReplicaInfo(newVersion, newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSuccessVersion, lastSuccessVersionHash, dataSize, rowCount); } - + public void updateVersionInfoForRecovery( long newVersion, long newVersionHash, long lastFailedVersion, long lastFailedVersionHash, @@ -292,26 +292,26 @@ public class Replica implements Writable { /* last failed version: LFV * last success version: LSV * version: V - * + * * Case 1: * If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid. * Clone task will clone the version between LSV and LFV - * + * * Case 2: * LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV. - * + * * Case 3: * LFV remains unchanged, just update LSV, and then check if it falls into Case 1. - * + * * Case 4: * V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may * happen when a clone task finished and report version V, but the LSV is already larger than V, * And we know that version between V and LSV is valid, so move V forward to LSV. - * + * * Case 5: * This is a bug case, I don't know why, may be some previous version introduce it. It looks like * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. - * We just reset the LFV(hash) to recovery this replica. + * We just reset the LFV(hash) to recovery this replica. */ private void updateReplicaInfo(long newVersion, long newVersionHash, long lastFailedVersion, long lastFailedVersionHash, @@ -352,14 +352,14 @@ public class Replica implements Writable { this.lastSuccessVersion = this.version; this.lastSuccessVersionHash = this.versionHash; } - + // TODO: this case is unknown, add log to observe if (this.version > lastFailedVersion && lastFailedVersion > 0) { LOG.debug("current version {} is larger than last failed version {}, " + "last failed version hash {}, maybe a fatal error or be report version, print a stack here ", this.version, lastFailedVersion, lastFailedVersionHash, new Exception()); } - + if (lastFailedVersion != this.lastFailedVersion) { // Case 2: if (lastFailedVersion > this.lastFailedVersion) { @@ -381,7 +381,7 @@ public class Replica implements Writable { this.lastSuccessVersionHash = this.versionHash; } } - + // Case 4: if (this.version >= this.lastFailedVersion) { this.lastFailedVersion = -1; @@ -431,6 +431,10 @@ public class Replica implements Writable { return true; } + public void setLastFailedVersion(long lastFailedVersion) { + this.lastFailedVersion = lastFailedVersion; + } + public void setState(ReplicaState replicaState) { this.state = replicaState; } @@ -488,13 +492,13 @@ public class Replica implements Writable { out.writeLong(dataSize); out.writeLong(rowCount); Text.writeString(out, state.name()); - + out.writeLong(lastFailedVersion); out.writeLong(lastFailedVersionHash); out.writeLong(lastSuccessVersion); out.writeLong(lastSuccessVersionHash); } - + public void readFields(DataInput in) throws IOException { id = in.readLong(); backendId = in.readLong(); @@ -510,13 +514,13 @@ public class Replica implements Writable { lastSuccessVersionHash = in.readLong(); } } - + public static Replica read(DataInput in) throws IOException { Replica replica = new Replica(); replica.readFields(in); return replica; } - + @Override public boolean equals(Object obj) { if (this == obj) { @@ -525,14 +529,14 @@ public class Replica implements Writable { if (!(obj instanceof Replica)) { return false; } - + Replica replica = (Replica) obj; - return (id == replica.id) - && (backendId == replica.backendId) + return (id == replica.id) + && (backendId == replica.backendId) && (version == replica.version) && (versionHash == replica.versionHash) && (dataSize == replica.dataSize) - && (rowCount == replica.rowCount) + && (rowCount == replica.rowCount) && (state.equals(replica.state)) && (lastFailedVersion == replica.lastFailedVersion) && (lastFailedVersionHash == replica.lastFailedVersionHash) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 1942894..77d72c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -17,9 +17,6 @@ package org.apache.doris.catalog; -import com.google.common.collect.ImmutableSet; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Triple; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.Config; import org.apache.doris.thrift.TPartitionVersionInfo; @@ -35,13 +32,16 @@ import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.collect.Table; import com.google.common.collect.TreeMultimap; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -176,7 +176,7 @@ public class TabletInvertedIndex { backendTabletInfo.getVersion(), backendTabletInfo.getVersionHash(), backendTabletInfo.getSchemaHash(), - backendTabletInfo.isSetUsed() ? backendTabletInfo.isUsed() : "unknown", + backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); synchronized (tabletRecoveryMap) { tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 7e1983a..467047a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -41,6 +41,7 @@ import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; + import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -60,7 +61,7 @@ public class EsRestClient { private static OkHttpClient networkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) .build(); - + private static OkHttpClient sslNetworkClient; private Request.Builder builder; @@ -154,7 +155,7 @@ public class EsRestClient { } return EsShardPartitions.findShardPartitions(indexName, searchShards); } - + /** * init ssl networkClient use lazy way **/ @@ -217,7 +218,7 @@ public class EsRestClient { } selectNextNode(); } - LOG.warn("try all nodes [{}],no other nodes left", nodes); + LOG.warn("try all nodes [{}], no other nodes left", nodes); if (scratchExceptionForThrow != null) { throw scratchExceptionForThrow; } @@ -245,11 +246,15 @@ public class EsRestClient { * support https **/ private static class TrustAllCerts implements X509TrustManager { - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } - public X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];} + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } } private static class TrustAllHostnameVerifier implements HostnameVerifier { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 801e75d..e35011c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -51,6 +51,7 @@ import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BackendIdsUpdateInfo; +import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; @@ -486,6 +487,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_BACKEND_REPLICAS_INFO: { + data = BackendReplicasInfo.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_ROUTINE_LOAD_JOB: { data = RoutineLoadJob.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index c561895..d67a7a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -108,7 +108,7 @@ public class LoadJob implements Writable { private EtlJobType etlJobType; private EtlJobInfo etlJobInfo; - + private Map<Long, TableLoadInfo> idToTableLoadInfo; private Map<Long, TabletLoadInfo> idToTabletLoadInfo; private Set<Long> quorumTablets; @@ -116,14 +116,14 @@ public class LoadJob implements Writable { private List<Long> unfinishedTablets; private Set<PushTask> pushTasks; private Map<Long, ReplicaPersistInfo> replicaPersistInfos; - + private Map<Long, Replica> finishedReplicas; - + private List<Predicate> conditions = null; private DeleteInfo deleteInfo; private TResourceInfo resourceInfo; - + private TPriority priority; private long execMemLimit; @@ -138,15 +138,15 @@ public class LoadJob implements Writable { public LoadJob(String label) { this(label, DEFAULT_TIMEOUT_S, Config.default_max_filter_ratio); } - + // convert an async delete job to load job - public LoadJob(long id, long dbId, long tableId, long partitionId, String label, - Map<Long, Integer> indexIdToSchemaHash, List<Predicate> deleteConditions, - DeleteInfo deleteInfo) { + public LoadJob(long id, long dbId, long tableId, long partitionId, String label, + Map<Long, Integer> indexIdToSchemaHash, List<Predicate> deleteConditions, + DeleteInfo deleteInfo) { this.id = id; this.dbId = dbId; this.tableId = tableId; - this.label = label; + this.label = label; this.transactionId = -1; this.timestamp = -1; this.timeoutSecond = DEFAULT_TIMEOUT_S; @@ -168,8 +168,8 @@ public class LoadJob implements Writable { hadoopEtlJobInfo.setEtlOutputDir(""); this.etlJobInfo = hadoopEtlJobInfo; this.etlJobInfo.setJobStatus(etlStatus); - this.idToTableLoadInfo = Maps.newHashMap();; - this.idToTabletLoadInfo = Maps.newHashMap();; + this.idToTableLoadInfo = Maps.newHashMap(); + this.idToTabletLoadInfo = Maps.newHashMap(); this.quorumTablets = new HashSet<Long>(); this.fullTablets = new HashSet<Long>(); this.unfinishedTablets = new ArrayList<>(); @@ -179,7 +179,7 @@ public class LoadJob implements Writable { this.priority = TPriority.NORMAL; this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT; this.finishedReplicas = Maps.newHashMap(); - + // generate table load info PartitionLoadInfo partitionLoadInfo = new PartitionLoadInfo(null); Map<Long, PartitionLoadInfo> idToPartitionLoadInfo = new HashMap<>(); @@ -187,7 +187,7 @@ public class LoadJob implements Writable { TableLoadInfo tableLoadInfo = new TableLoadInfo(idToPartitionLoadInfo); tableLoadInfo.addAllSchemaHash(indexIdToSchemaHash); idToTableLoadInfo.put(tableId, tableLoadInfo); - + // add delete conditions to load job this.conditions = deleteConditions; this.deleteInfo = deleteInfo; @@ -224,11 +224,11 @@ public class LoadJob implements Writable { this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT; this.finishedReplicas = Maps.newHashMap(); } - + public void addTableName(String tableName) { tableNames.add(tableName); } - + public Set<String> getTableNames() { return tableNames; } @@ -254,11 +254,11 @@ public class LoadJob implements Writable { public void setDbId(long dbId) { this.dbId = dbId; } - + public long getTransactionId() { return transactionId; } - + public void setTransactionId(long transactionId) { this.transactionId = transactionId; } @@ -274,15 +274,15 @@ public class LoadJob implements Writable { public long getTimestamp() { return timestamp; } - + public void setTimeoutSecond(int timeoutSecond) { this.timeoutSecond = timeoutSecond; } - + public int getTimeoutSecond() { return timeoutSecond; } - + public void setMaxFilterRatio(double maxFilterRatio) { this.maxFilterRatio = maxFilterRatio; } @@ -310,7 +310,7 @@ public class LoadJob implements Writable { public long getCreateTimeMs() { return createTimeMs; } - + public void setCreateTimeMs(long createTimeMs) { this.createTimeMs = createTimeMs; } @@ -371,7 +371,7 @@ public class LoadJob implements Writable { break; } } - + public long getQuorumFinishTimeMs() { return quorumFinishTimeMs; } @@ -387,7 +387,7 @@ public class LoadJob implements Writable { public void setFailMsg(FailMsg failMsg) { this.failMsg = failMsg; } - + public EtlJobType getEtlJobType() { return etlJobType; } @@ -408,9 +408,13 @@ public class LoadJob implements Writable { return pullLoadSourceInfo; } - public void setExecMemLimit(long execMemLimit) { this.execMemLimit = execMemLimit; } + public void setExecMemLimit(long execMemLimit) { + this.execMemLimit = execMemLimit; + } - public long getExecMemLimit() { return execMemLimit; } + public long getExecMemLimit() { + return execMemLimit; + } public void setEtlJobType(EtlJobType etlJobType) { this.etlJobType = etlJobType; @@ -485,17 +489,18 @@ public class LoadJob implements Writable { public void setHadoopEtlJobId(String etlJobId) { if (etlJobType == EtlJobType.HADOOP) { - ((HadoopEtlJobInfo) etlJobInfo).setEtlJobId(etlJobId);; + ((HadoopEtlJobInfo) etlJobInfo).setEtlJobId(etlJobId); + ; } } - + public Map<Long, MiniEtlTaskInfo> getMiniEtlTasks() { if (etlJobType == EtlJobType.MINI) { return ((MiniEtlJobInfo) etlJobInfo).getEtlTasks(); } return null; } - + public MiniEtlTaskInfo getMiniEtlTask(long taskId) { if (etlJobType == EtlJobType.MINI) { return ((MiniEtlJobInfo) etlJobInfo).getEtlTask(taskId); @@ -508,7 +513,7 @@ public class LoadJob implements Writable { ((MiniEtlJobInfo) etlJobInfo).setEtlTasks(idToEtlTask); } } - + public boolean miniNeedGetTaskStatus() { if (etlJobType == EtlJobType.MINI) { return ((MiniEtlJobInfo) etlJobInfo).needGetTaskStatus(); @@ -519,15 +524,15 @@ public class LoadJob implements Writable { public EtlStatus getEtlJobStatus() { return etlJobInfo.getJobStatus(); } - + public void setEtlJobStatus(EtlStatus etlStatus) { etlJobInfo.setJobStatus(etlStatus); } - + public Map<Long, TableLoadInfo> getIdToTableLoadInfo() { return idToTableLoadInfo; } - + public TableLoadInfo getTableLoadInfo(long tableId) { return idToTableLoadInfo.get(tableId); } @@ -556,7 +561,7 @@ public class LoadJob implements Writable { public Map<Long, TabletLoadInfo> getIdToTabletLoadInfo() { return idToTabletLoadInfo; } - + public TabletLoadInfo getTabletLoadInfo(long tabletId) { return idToTabletLoadInfo.get(tabletId); } @@ -564,19 +569,19 @@ public class LoadJob implements Writable { public void setIdToTabletLoadInfo(Map<Long, TabletLoadInfo> idTotabletLoadInfo) { this.idToTabletLoadInfo = idTotabletLoadInfo; } - + public void addQuorumTablet(long tabletId) { quorumTablets.add(tabletId); } - + public Set<Long> getQuorumTablets() { return quorumTablets; } - + public void clearQuorumTablets() { quorumTablets.clear(); } - + public void addFullTablet(long tabletId) { fullTablets.add(tabletId); } @@ -584,24 +589,24 @@ public class LoadJob implements Writable { public Set<Long> getFullTablets() { return fullTablets; } - + public void setUnfinishedTablets(Set<Long> unfinishedTablets) { this.unfinishedTablets.clear(); this.unfinishedTablets.addAll(unfinishedTablets); } - + public void addPushTask(PushTask pushTask) { pushTasks.add(pushTask); } - + public Set<PushTask> getPushTasks() { return pushTasks; } - + public Map<Long, ReplicaPersistInfo> getReplicaPersistInfos() { return this.replicaPersistInfos; } - + public void addReplicaPersistInfos(ReplicaPersistInfo info) { if (!replicaPersistInfos.containsKey(info.getReplicaId())) { replicaPersistInfos.put(info.getReplicaId(), info); @@ -615,16 +620,16 @@ public class LoadJob implements Writable { public TResourceInfo getResourceInfo() { return resourceInfo; } - + public boolean addFinishedReplica(Replica replica) { finishedReplicas.put(replica.getId(), replica); return true; } - + public boolean isReplicaFinished(long replicaId) { return finishedReplicas.containsKey(replicaId); } - + public Collection<Replica> getFinishedReplicas() { return finishedReplicas.values(); } @@ -632,24 +637,24 @@ public class LoadJob implements Writable { public List<Predicate> getConditions() { return conditions; } - + public boolean isSyncDeleteJob() { if (conditions != null) { return true; } return false; } - + public DeleteInfo getDeleteInfo() { return deleteInfo; } - + public long getDeleteJobTimeout() { // timeout is between 30 seconds to 5 min long timeout = Math.max(idToTabletLoadInfo.size() * Config.tablet_delete_timeout_second * 1000L, 30000L); return Math.min(timeout, Config.load_straggler_wait_second * 1000L); } - + @Override public String toString() { return "LoadJob [id=" + id + ", dbId=" + dbId + ", label=" + label + ", timeoutSecond=" + timeoutSecond @@ -657,9 +662,9 @@ public class LoadJob implements Writable { + ", progress=" + progress + ", createTimeMs=" + createTimeMs + ", etlStartTimeMs=" + etlStartTimeMs + ", etlFinishTimeMs=" + etlFinishTimeMs + ", loadStartTimeMs=" + loadStartTimeMs + ", loadFinishTimeMs=" + loadFinishTimeMs + ", failMsg=" + failMsg + ", etlJobType=" + etlJobType - + ", etlJobInfo=" + etlJobInfo + ", priority=" + priority + ", transactionId=" + transactionId - + ", quorumFinishTimeMs=" + quorumFinishTimeMs - + ", unfinished tablets=[" + this.unfinishedTablets.subList(0, Math.min(3, this.unfinishedTablets.size())) + "]" + + ", etlJobInfo=" + etlJobInfo + ", priority=" + priority + ", transactionId=" + transactionId + + ", quorumFinishTimeMs=" + quorumFinishTimeMs + + ", unfinished tablets=[" + this.unfinishedTablets.subList(0, Math.min(3, this.unfinishedTablets.size())) + "]" + "]"; } @@ -669,22 +674,22 @@ public class LoadJob implements Writable { idToTableLoadInfo.clear(); idToTableLoadInfo = null; } - + if (idToTabletLoadInfo != null) { idToTabletLoadInfo.clear(); idToTabletLoadInfo = null; } - + if (quorumTablets != null) { quorumTablets.clear(); quorumTablets = null; } - + if (fullTablets != null) { fullTablets.clear(); fullTablets = null; } - + if (replicaPersistInfos != null) { replicaPersistInfos.clear(); replicaPersistInfos = null; @@ -718,7 +723,7 @@ public class LoadJob implements Writable { out.writeLong(createTimeMs); out.writeLong(etlStartTimeMs); out.writeLong(etlFinishTimeMs); - out.writeLong(loadStartTimeMs); + out.writeLong(loadStartTimeMs); out.writeLong(loadFinishTimeMs); failMsg.write(out); Text.writeString(out, etlJobType.name()); @@ -736,7 +741,7 @@ public class LoadJob implements Writable { entry.getValue().write(out); } } - + if (idToTabletLoadInfo == null) { out.writeBoolean(false); } else { @@ -748,7 +753,7 @@ public class LoadJob implements Writable { entry.getValue().write(out); } } - + if (fullTablets == null) { out.writeBoolean(false); } else { @@ -759,7 +764,7 @@ public class LoadJob implements Writable { out.writeLong(id); } } - + if (replicaPersistInfos == null) { out.writeBoolean(false); } else { @@ -800,7 +805,7 @@ public class LoadJob implements Writable { out.writeLong(execMemLimit); out.writeLong(transactionId); - + if (conditions != null) { out.writeBoolean(true); count = conditions.size(); @@ -858,7 +863,7 @@ public class LoadJob implements Writable { } timeoutSecond = in.readInt(); maxFilterRatio = in.readDouble(); - + boolean deleteFlag = false; if (version >= FeMetaVersion.VERSION_30) { deleteFlag = in.readBoolean(); @@ -889,7 +894,7 @@ public class LoadJob implements Writable { long key = in.readLong(); TableLoadInfo value = new TableLoadInfo(); value.readFields(in); - idToTableLoadInfo.put(key, value); + idToTableLoadInfo.put(key, value); } } @@ -903,7 +908,7 @@ public class LoadJob implements Writable { idToTabletLoadInfo.put(key, tLoadInfo); } } - + if (in.readBoolean()) { count = in.readInt(); fullTablets = new HashSet<Long>(); @@ -912,7 +917,7 @@ public class LoadJob implements Writable { fullTablets.add(id); } } - + if (in.readBoolean()) { count = in.readInt(); replicaPersistInfos = Maps.newHashMap(); @@ -994,19 +999,19 @@ public class LoadJob implements Writable { } } } - + @Override public boolean equals(Object obj) { if (obj == this) { return true; } - + if (!(obj instanceof LoadJob)) { return false; } - + LoadJob job = (LoadJob) obj; - + if (this.id == job.id) { return true; } @@ -1016,6 +1021,6 @@ public class LoadJob implements Writable { // Return true if this job is finished for a long time public boolean isExpired(long currentTimeMs) { return (getState() == JobState.FINISHED || getState() == JobState.CANCELLED) - && (currentTimeMs - getLoadFinishTimeMs()) / 1000 > Config.label_keep_max_second; + && (currentTimeMs - getLoadFinishTimeMs()) / 1000 > Config.label_keep_max_second; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index fabc2c7..67e96f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -42,6 +42,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.system.Backend; @@ -808,8 +809,7 @@ public class ReportHandler extends Daemon { tabletRecoveryMap.size(), backendId); TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); - BackendTabletsInfo backendTabletsInfo = new BackendTabletsInfo(backendId); - backendTabletsInfo.setBad(true); + BackendReplicasInfo backendReplicasInfo = new BackendReplicasInfo(backendId); for (Long dbId : tabletRecoveryMap.keySet()) { Database db = Catalog.getCurrentCatalog().getDbNullable(dbId); if (db == null) { @@ -859,37 +859,17 @@ public class ReportHandler extends Daemon { if (replica.setBad(true)) { LOG.warn("set bad for replica {} of tablet {} on backend {}", replica.getId(), tabletId, backendId); - ReplicaPersistInfo replicaPersistInfo = ReplicaPersistInfo.createForReport( - dbId, tableId, partitionId, indexId, tabletId, backendId, replica.getId()); - backendTabletsInfo.addReplicaInfo(replicaPersistInfo); + backendReplicasInfo.addBadReplica(tabletId); } break; } - if (replica.getVersion() > tTabletInfo.getVersion()) { - LOG.warn("recover for replica {} of tablet {} on backend {}", - replica.getId(), tabletId, backendId); - if (replica.getVersion() == tTabletInfo.getVersion() + 1) { - // this missing version is the last version of this replica - replica.updateVersionInfoForRecovery( - tTabletInfo.getVersion(), /* set version to BE report version */ - -1, /* BE report version hash is meaningless here */ - replica.getVersion(), /* set LFV to current FE version */ - replica.getVersionHash(), /* set LFV hash to current FE version hash */ - tTabletInfo.getVersion(), /* set LSV to BE report version */ - -1 /* LSV hash is unknown */); - } else { - // this missing version is a hole - replica.updateVersionInfoForRecovery( - tTabletInfo.getVersion(), /* set version to BE report version */ - -1, /* BE report version hash is meaningless here */ - tTabletInfo.getVersion() + 1, /* LFV */ - -1, /* LFV hash is unknown */ - /* remain LSV unchanged, which should be equal to replica.version */ - replica.getLastSuccessVersion(), - replica.getLastSuccessVersionHash()); - } - // no need to write edit log, if FE crashed, this will be recovered again + if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { + // The absolute value is meaningless, as long as it is greater than 0. + // This way, in other checking logic, if lastFailedVersion is found to be greater than 0, + // it will be considered a version missing replica and will be handled accordingly. + replica.setLastFailedVersion(1L); + backendReplicasInfo.addMissingVersionReplica(tabletId); break; } } @@ -900,9 +880,9 @@ public class ReportHandler extends Daemon { } } // end for recovery map - if (!backendTabletsInfo.isEmpty()) { + if (!backendReplicasInfo.isEmpty()) { // need to write edit log the sync the bad info to other FEs - Catalog.getCurrentCatalog().getEditLog().logBackendTabletsInfo(backendTabletsInfo); + Catalog.getCurrentCatalog().getEditLog().logBackendReplicasInfo(backendReplicasInfo); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java new file mode 100644 index 0000000..c382e1f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +// This class is used to record the replica information that needs to be persisted +// and synchronized to other FEs when BE reports tablet, +// such as bad replica or missing version replica information, etc. +public class BackendReplicasInfo implements Writable { + + @SerializedName(value = "backendId") + private long backendId; + @SerializedName(value = "replicaReportInfos") + private List<ReplicaReportInfo> replicaReportInfos = Lists.newArrayList(); + + public BackendReplicasInfo(long backendId) { + this.backendId = backendId; + } + + public void addBadReplica(long tabletId) { + replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.BAD)); + } + + public void addMissingVersionReplica(long tabletId) { + replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.MISSING_VERSION)); + } + + public long getBackendId() { + return backendId; + } + + public List<ReplicaReportInfo> getReplicaReportInfos() { + return replicaReportInfos; + } + + public boolean isEmpty() { + return replicaReportInfos.isEmpty(); + } + + public static BackendReplicasInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BackendReplicasInfo.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public enum ReportInfoType { + BAD, + MISSING_VERSION + } + + public static class ReplicaReportInfo implements Writable { + @SerializedName(value = "tabletId") + public long tabletId; + @SerializedName(value = "type") + public ReportInfoType type; + + public ReplicaReportInfo(long tabletId, ReportInfoType type) { + this.tabletId = tabletId; + this.type = type; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static ReplicaReportInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java index 5182436..23c0a98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java @@ -27,6 +27,8 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; +@Deprecated +// replaced by BackendReplicaInfo public class BackendTabletsInfo implements Writable { private long backendId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index c0a591a..7ce4b53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -621,6 +621,11 @@ public class EditLog { Catalog.getCurrentCatalog().replayBackendTabletsInfo(backendTabletsInfo); break; } + case OperationType.OP_BACKEND_REPLICAS_INFO: { + BackendReplicasInfo backendReplicasInfo = (BackendReplicasInfo) journal.getData(); + Catalog.getCurrentCatalog().replayBackendReplicasInfo(backendReplicasInfo); + break; + } case OperationType.OP_CREATE_ROUTINE_LOAD_JOB: { RoutineLoadJob routineLoadJob = (RoutineLoadJob) journal.getData(); Catalog.getCurrentCatalog().getRoutineLoadManager().replayCreateRoutineLoadJob(routineLoadJob); @@ -1253,10 +1258,15 @@ public class EditLog { logEdit(OperationType.OP_DROP_ENCRYPTKEY, desc); } + @Deprecated public void logBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { logEdit(OperationType.OP_BACKEND_TABLETS_INFO, backendTabletsInfo); } + public void logBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) { + logEdit(OperationType.OP_BACKEND_REPLICAS_INFO, backendReplicasInfo); + } + public void logCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { logEdit(OperationType.OP_CREATE_ROUTINE_LOAD_JOB, routineLoadJob); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 2a5f3a2..bb5aaa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -88,8 +88,10 @@ public class OperationType { @Deprecated public static final short OP_FINISH_ASYNC_DELETE = 44; public static final short OP_UPDATE_REPLICA = 45; + @Deprecated public static final short OP_BACKEND_TABLETS_INFO = 46; public static final short OP_SET_REPLICA_STATUS = 47; + public static final short OP_BACKEND_REPLICAS_INFO = 48; public static final short OP_ADD_BACKEND = 50; public static final short OP_DROP_BACKEND = 51; diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java new file mode 100644 index 0000000..54ceaf4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.meta.MetaContext; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.List; + +public class BackendReplicaInfosTest { + + long beId = 1000; + long tabletId1 = 2001; + long tabletId2 = 2002; + + @Test + public void testSerialization() throws Exception { + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT); + metaContext.setThreadLocalInfo(); + + // 1. Write objects to file + File file = new File("./BackendReplicaInfosTest"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + BackendReplicasInfo info = new BackendReplicasInfo(beId); + info.addBadReplica(tabletId1); + info.addMissingVersionReplica(tabletId2); + checkInfo(info); + info.write(dos); + dos.flush(); + dos.close(); + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + BackendReplicasInfo rInfo1 = BackendReplicasInfo.read(dis); + checkInfo(rInfo1); + + // 3. delete files + dis.close(); + file.delete(); + } + + private void checkInfo(BackendReplicasInfo info) { + Assert.assertTrue(!info.isEmpty()); + List<BackendReplicasInfo.ReplicaReportInfo> infos = info.getReplicaReportInfos(); + for (BackendReplicasInfo.ReplicaReportInfo reportInfo : infos) { + if (reportInfo.tabletId == tabletId1) { + Assert.assertEquals(BackendReplicasInfo.ReportInfoType.BAD, reportInfo.type); + } else if (reportInfo.tabletId == tabletId2) { + Assert.assertEquals(BackendReplicasInfo.ReportInfoType.MISSING_VERSION, reportInfo.type); + } else { + Assert.fail("unknown tablet id: " + reportInfo.tabletId); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
