AntiTopQuark commented on code in PR #61199: URL: https://github.com/apache/doris/pull/61199#discussion_r3007252376
########## fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java: ########## @@ -0,0 +1,503 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.journal.local.LocalJournal; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.EditLog; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +public class TSOService extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(TSOService.class); + + // Global timestamp with physical time and logical counter + private final TSOTimestamp globalTimestamp = new TSOTimestamp(); + // Lock for thread-safe access to global timestamp + private final ReentrantLock lock = new ReentrantLock(); + // Guard value for time window updates (in milliseconds) + private static final long UPDATE_TIME_WINDOW_GUARD = 1; + + private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private final AtomicBoolean fatalClockBackwardReported = new AtomicBoolean(false); + private final AtomicLong windowEndTSO = new AtomicLong(0); + + private static final class TSOClockBackwardException extends RuntimeException { + private TSOClockBackwardException(String message) { + super(message); + } + } + + /** + * Constructor initializes the TSO service with update interval + */ + public TSOService() { + super("TSO-service", Config.tso_service_update_interval_ms); + } + + /** + * Start the TSO service. + */ + @Override + public synchronized void start() { + super.start(); + } + + /** + * Periodically update timestamp after catalog is ready + * This method is called by the MasterDaemon framework + */ + @Override + protected void runAfterCatalogReady() { + if (!Config.enable_tso_feature) { + isInitialized.set(false); + fatalClockBackwardReported.set(false); + return; + } + int maxUpdateRetryCount = Math.max(1, Config.tso_max_update_retry_count); + boolean updated = false; + Throwable lastFailure = null; + if (!isInitialized.get()) { + for (int i = 0; i < maxUpdateRetryCount; i++) { + if (isInitialized.get()) { + break; + } + LOG.info("TSO service timestamp is not calibrated, start calibrate timestamp"); + try { + calibrateTimestamp(); + } catch (TSOClockBackwardException e) { + lastFailure = e; + if (fatalClockBackwardReported.compareAndSet(false, true)) { + LOG.error("TSO service calibrate timestamp failed due to clock backward beyond threshold", e); + throw e; + } + return; + } catch (Exception e) { + lastFailure = e; + LOG.warn("TSO service calibrate timestamp failed", e); + } + if (!isInitialized.get()) { + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + } + } + if (!isInitialized.get()) { + return; + } + } + + for (int i = 0; i < maxUpdateRetryCount; i++) { + try { + updateTimestamp(); + updated = true; + break; + } catch (Exception e) { + lastFailure = e; + LOG.warn("TSO service update timestamp failed, retry: {}", i, e); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L); + } + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + } + } + + if (updated) { + if (LOG.isDebugEnabled()) { + LOG.debug("TSO service updated timestamp"); + } + } else if (lastFailure != null) { + LOG.warn("TSO service update timestamp failed after {} retries", + maxUpdateRetryCount, lastFailure); + } else { + LOG.warn("TSO service update timestamp failed after {} retries", maxUpdateRetryCount); + } + } + + /** + * Generate a single TSO timestamp + * + * @return Composed TSO timestamp combining physical time and logical counter + * @throws RuntimeException if TSO is not calibrated or other errors occur + */ + public long getTSO() { + if (!isInitialized.get()) { + throw new RuntimeException("TSO timestamp is not calibrated, please check"); + } + int maxGetTSORetryCount = Math.max(1, Config.tso_max_get_retry_count); + RuntimeException lastFailure = null; + for (int i = 0; i < maxGetTSORetryCount; i++) { + // Wait for environment to be ready and ensure we're running on master FE + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady()) { + LOG.warn("TSO service wait for catalog ready"); + lastFailure = new RuntimeException("Env is null or not ready"); + try { + sleep(200); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } else if (!env.isMaster()) { + LOG.warn("TSO service only run on master FE"); + lastFailure = new RuntimeException("Current FE is not master"); + try { + sleep(200); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } + + Pair<Long, Long> pair = generateTSO(); + long physical = pair.first; + long logical = pair.second; + + if (physical == 0) { + throw new RuntimeException("TSO timestamp is not calibrated, please check"); + } + + // Check for logical counter overflow + if (logical > TSOTimestamp.MAX_LOGICAL_COUNTER) { + LOG.warn("TSO timestamp logical counter overflow, please check"); + lastFailure = new RuntimeException("TSO timestamp logical counter overflow"); + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L); + } + return TSOTimestamp.composeTimestamp(physical, logical); + } + throw new RuntimeException("Failed to get TSO after " + maxGetTSORetryCount + " retries", lastFailure); + } + + /** + * Get the current composed TSO timestamp + * + * @return Current TSO timestamp combining physical time and logical counter + */ + public long getCurrentTSO() { + lock.lock(); + try { + return globalTimestamp.composeTimestamp(); + } finally { + lock.unlock(); + } + } + + /** + * Calibrate the TSO timestamp when service starts + * This ensures the timestamp is consistent with the last persisted value + * + * Algorithm: + * - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1 + * - Otherwise Tnext = Tnow + */ + private void calibrateTimestamp() { + if (isInitialized.get()) { + return; + } + // Check if Env is ready before calibration + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady() || !env.isMaster()) { + LOG.warn("Env is not ready or not master, skip TSO timestamp calibration"); + return; + } + + long timeLast = windowEndTSO.get(); // Last timestamp from image/editlog replay + long timeNow = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + long backwardMs = timeLast - timeNow; + if (backwardMs > Config.tso_clock_backward_startup_threshold_ms) { + throw new TSOClockBackwardException("TSO clock backward too much during calibration, backwardMs=" + + backwardMs + ", thresholdMs=" + Config.tso_clock_backward_startup_threshold_ms + + ", lastWindowEndTSO=" + timeLast + ", currentMillis=" + timeNow); + } + + // Calculate next physical time to ensure monotonicity + long nextPhysicalTime; + if (timeNow - timeLast < 1) { + nextPhysicalTime = timeLast + 1; + } else { + nextPhysicalTime = timeNow; + } + + // Construct new timestamp (physical time with reset logical counter) + setTSOPhysical(nextPhysicalTime, true); + + // Write the right boundary of time window to BDBJE for persistence + long timeWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms; + windowEndTSO.set(timeWindowEnd); + writeTimestampToBDBJE(timeWindowEnd); + isInitialized.set(true); + + LOG.info("TSO timestamp calibrated: lastTimestamp={}, currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}", + timeLast, timeNow, nextPhysicalTime, timeWindowEnd); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L); + } + } + + /** + * Update timestamp periodically to maintain time window + * This method handles various time-related issues: + * 1. Clock drift detection + * 2. Clock backward detection + * 3. Logical counter overflow handling + * 4. Time window renewal + */ + private void updateTimestamp() { + // Check if Env is ready + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady() || !env.isMaster()) { + LOG.warn("Env is not ready or not master, skip TSO timestamp update"); + return; + } + + // 1. Check if TSO has been calibrated + long currentTime = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + long prevPhysicalTime = 0; + long prevLogicalCounter = 0; + + lock.lock(); + try { + prevPhysicalTime = globalTimestamp.getPhysicalTimestamp(); + prevLogicalCounter = globalTimestamp.getLogicalCounter(); + } finally { + lock.unlock(); + } + + if (prevPhysicalTime == 0) { + LOG.error("TSO timestamp is not calibrated, please check"); Review Comment: done ########## fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java: ########## @@ -0,0 +1,503 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.journal.local.LocalJournal; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.EditLog; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +public class TSOService extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(TSOService.class); + + // Global timestamp with physical time and logical counter + private final TSOTimestamp globalTimestamp = new TSOTimestamp(); + // Lock for thread-safe access to global timestamp + private final ReentrantLock lock = new ReentrantLock(); + // Guard value for time window updates (in milliseconds) + private static final long UPDATE_TIME_WINDOW_GUARD = 1; + + private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private final AtomicBoolean fatalClockBackwardReported = new AtomicBoolean(false); + private final AtomicLong windowEndTSO = new AtomicLong(0); + + private static final class TSOClockBackwardException extends RuntimeException { + private TSOClockBackwardException(String message) { + super(message); + } + } + + /** + * Constructor initializes the TSO service with update interval + */ + public TSOService() { + super("TSO-service", Config.tso_service_update_interval_ms); + } + + /** + * Start the TSO service. + */ + @Override + public synchronized void start() { + super.start(); + } + + /** + * Periodically update timestamp after catalog is ready + * This method is called by the MasterDaemon framework + */ + @Override + protected void runAfterCatalogReady() { + if (!Config.enable_tso_feature) { + isInitialized.set(false); + fatalClockBackwardReported.set(false); + return; + } + int maxUpdateRetryCount = Math.max(1, Config.tso_max_update_retry_count); + boolean updated = false; + Throwable lastFailure = null; + if (!isInitialized.get()) { + for (int i = 0; i < maxUpdateRetryCount; i++) { + if (isInitialized.get()) { + break; + } + LOG.info("TSO service timestamp is not calibrated, start calibrate timestamp"); + try { + calibrateTimestamp(); + } catch (TSOClockBackwardException e) { + lastFailure = e; + if (fatalClockBackwardReported.compareAndSet(false, true)) { + LOG.error("TSO service calibrate timestamp failed due to clock backward beyond threshold", e); + throw e; + } + return; + } catch (Exception e) { + lastFailure = e; + LOG.warn("TSO service calibrate timestamp failed", e); + } + if (!isInitialized.get()) { + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + } + } + if (!isInitialized.get()) { + return; + } + } + + for (int i = 0; i < maxUpdateRetryCount; i++) { + try { + updateTimestamp(); + updated = true; + break; + } catch (Exception e) { + lastFailure = e; + LOG.warn("TSO service update timestamp failed, retry: {}", i, e); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L); + } + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + } + } + + if (updated) { + if (LOG.isDebugEnabled()) { + LOG.debug("TSO service updated timestamp"); + } + } else if (lastFailure != null) { + LOG.warn("TSO service update timestamp failed after {} retries", + maxUpdateRetryCount, lastFailure); + } else { + LOG.warn("TSO service update timestamp failed after {} retries", maxUpdateRetryCount); + } + } + + /** + * Generate a single TSO timestamp + * + * @return Composed TSO timestamp combining physical time and logical counter + * @throws RuntimeException if TSO is not calibrated or other errors occur + */ + public long getTSO() { + if (!isInitialized.get()) { + throw new RuntimeException("TSO timestamp is not calibrated, please check"); + } + int maxGetTSORetryCount = Math.max(1, Config.tso_max_get_retry_count); + RuntimeException lastFailure = null; + for (int i = 0; i < maxGetTSORetryCount; i++) { + // Wait for environment to be ready and ensure we're running on master FE + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady()) { + LOG.warn("TSO service wait for catalog ready"); + lastFailure = new RuntimeException("Env is null or not ready"); + try { + sleep(200); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } else if (!env.isMaster()) { + LOG.warn("TSO service only run on master FE"); + lastFailure = new RuntimeException("Current FE is not master"); + try { + sleep(200); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } + + Pair<Long, Long> pair = generateTSO(); + long physical = pair.first; + long logical = pair.second; + + if (physical == 0) { + throw new RuntimeException("TSO timestamp is not calibrated, please check"); + } + + // Check for logical counter overflow + if (logical > TSOTimestamp.MAX_LOGICAL_COUNTER) { + LOG.warn("TSO timestamp logical counter overflow, please check"); + lastFailure = new RuntimeException("TSO timestamp logical counter overflow"); + try { + sleep(Config.tso_service_update_interval_ms); + } catch (InterruptedException ie) { + LOG.warn("TSO service sleep interrupted", ie); + Thread.currentThread().interrupt(); + } + continue; + } + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L); + } + return TSOTimestamp.composeTimestamp(physical, logical); + } + throw new RuntimeException("Failed to get TSO after " + maxGetTSORetryCount + " retries", lastFailure); + } + + /** + * Get the current composed TSO timestamp + * + * @return Current TSO timestamp combining physical time and logical counter + */ + public long getCurrentTSO() { + lock.lock(); + try { + return globalTimestamp.composeTimestamp(); + } finally { + lock.unlock(); + } + } + + /** + * Calibrate the TSO timestamp when service starts + * This ensures the timestamp is consistent with the last persisted value + * + * Algorithm: + * - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1 + * - Otherwise Tnext = Tnow + */ + private void calibrateTimestamp() { + if (isInitialized.get()) { + return; + } + // Check if Env is ready before calibration + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady() || !env.isMaster()) { + LOG.warn("Env is not ready or not master, skip TSO timestamp calibration"); + return; + } + + long timeLast = windowEndTSO.get(); // Last timestamp from image/editlog replay + long timeNow = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + long backwardMs = timeLast - timeNow; + if (backwardMs > Config.tso_clock_backward_startup_threshold_ms) { + throw new TSOClockBackwardException("TSO clock backward too much during calibration, backwardMs=" + + backwardMs + ", thresholdMs=" + Config.tso_clock_backward_startup_threshold_ms + + ", lastWindowEndTSO=" + timeLast + ", currentMillis=" + timeNow); + } + + // Calculate next physical time to ensure monotonicity + long nextPhysicalTime; + if (timeNow - timeLast < 1) { + nextPhysicalTime = timeLast + 1; + } else { + nextPhysicalTime = timeNow; + } + + // Construct new timestamp (physical time with reset logical counter) + setTSOPhysical(nextPhysicalTime, true); + + // Write the right boundary of time window to BDBJE for persistence + long timeWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms; + windowEndTSO.set(timeWindowEnd); + writeTimestampToBDBJE(timeWindowEnd); + isInitialized.set(true); + + LOG.info("TSO timestamp calibrated: lastTimestamp={}, currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}", + timeLast, timeNow, nextPhysicalTime, timeWindowEnd); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L); + } + } + + /** + * Update timestamp periodically to maintain time window + * This method handles various time-related issues: + * 1. Clock drift detection + * 2. Clock backward detection + * 3. Logical counter overflow handling + * 4. Time window renewal + */ + private void updateTimestamp() { + // Check if Env is ready + Env env = Env.getCurrentEnv(); + if (env == null || !env.isReady() || !env.isMaster()) { + LOG.warn("Env is not ready or not master, skip TSO timestamp update"); + return; + } + + // 1. Check if TSO has been calibrated + long currentTime = System.currentTimeMillis() + Config.tso_time_offset_debug_mode; + long prevPhysicalTime = 0; + long prevLogicalCounter = 0; + + lock.lock(); + try { + prevPhysicalTime = globalTimestamp.getPhysicalTimestamp(); + prevLogicalCounter = globalTimestamp.getLogicalCounter(); + } finally { + lock.unlock(); + } + + if (prevPhysicalTime == 0) { + LOG.error("TSO timestamp is not calibrated, please check"); + } + + // 2. Check for serious clock issues + long timeLag = currentTime - prevPhysicalTime; + if (timeLag >= 3 * Config.tso_service_update_interval_ms) { + // Clock drift (time difference too large), log clearly and trigger corresponding metric + LOG.warn("TSO clock drift detected, lastPhysicalTime={}, currentTime={}, " + + "timeLag={} (exceeds 3 * update interval {})", + prevPhysicalTime, currentTime, timeLag, 3 * Config.tso_service_update_interval_ms); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_DRIFT_DETECTED.increase(1L); + } + } else if (timeLag < 0) { + // Clock backward (current time earlier than last recorded time) + // log clearly and trigger corresponding metric + LOG.warn("TSO clock backward detected, lastPhysicalTime={}, currentTime={}, " + + "timeLag={} (current time is earlier than last physical time)", + prevPhysicalTime, currentTime, timeLag); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_BACKWARD_DETECTED.increase(1L); + } + } + + // 3. Update time based on conditions + long nextPhysicalTime = prevPhysicalTime; + if (timeLag > UPDATE_TIME_WINDOW_GUARD) { + // Align physical time to current time + nextPhysicalTime = currentTime; + } else if (prevLogicalCounter > TSOTimestamp.MAX_LOGICAL_COUNTER / 2) { + // Logical counter nearly full → advance to next millisecond + nextPhysicalTime = prevPhysicalTime + 1; + } else { + // Logical counter not nearly full → just increment logical counter + // do nothing + } + + // 4. Check if time window right boundary needs renewal + if ((windowEndTSO.get() - nextPhysicalTime) <= UPDATE_TIME_WINDOW_GUARD) { + // Time window right boundary needs renewal + long nextWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms; + windowEndTSO.set(nextWindowEnd); + writeTimestampToBDBJE(nextWindowEnd); + } + + // 5. Update global timestamp + setTSOPhysical(nextPhysicalTime, false); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TSO_CLOCK_UPDATED.increase(1L); + } + } + + /** + * Write the right boundary of TSO time window to BDBJE for persistence + * + * @param timestamp The timestamp to write + */ + private void writeTimestampToBDBJE(long timestamp) { + try { + // Check if Env is ready + Env env = Env.getCurrentEnv(); + if (env == null) { + LOG.warn("Env is null, skip writing TSO timestamp to BDBJE"); + return; + } + + // Check if Env is ready and is master + if (!env.isReady()) { + LOG.warn("Env is not ready, skip writing TSO timestamp to BDBJE"); + return; + } + + if (!env.isMaster()) { + LOG.warn("Current node is not master, skip writing TSO timestamp to BDBJE"); + return; + } + + TSOTimestamp tsoTimestamp = new TSOTimestamp(timestamp, 0); + + // Check if EditLog is available + EditLog editLog = env.getEditLog(); + if (editLog == null) { + LOG.warn("EditLog is null, skip writing TSO timestamp to BDBJE"); + return; + } + + // Additional check to ensure EditLog's journal is properly initialized + if (editLog.getJournal() == null) { + LOG.warn("EditLog's journal is null, skip writing TSO timestamp to BDBJE"); + return; + } + + if (editLog.getJournal() instanceof LocalJournal) { + if (!((LocalJournal) editLog.getJournal()).isReadyToFlush()) { + LOG.warn("EditLog's journal is not ready to flush, skip writing TSO timestamp to BDBJE"); + return; + } + } + + if (Config.enable_tso_persist_journal) { + editLog.logTSOTimestampWindowEnd(tsoTimestamp); + } else { + LOG.debug("TSO timestamp {} is not persisted to journal, " + + "please check if enable_tso_persist_journal is set to true", + tsoTimestamp); + } + } catch (Exception e) { + LOG.error("Failed to write TSO timestamp to BDBJE", e); Review Comment: done ########## fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java: ########## @@ -0,0 +1,503 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tso; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.journal.local.LocalJournal; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.EditLog; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +public class TSOService extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(TSOService.class); + + // Global timestamp with physical time and logical counter + private final TSOTimestamp globalTimestamp = new TSOTimestamp(); + // Lock for thread-safe access to global timestamp + private final ReentrantLock lock = new ReentrantLock(); + // Guard value for time window updates (in milliseconds) + private static final long UPDATE_TIME_WINDOW_GUARD = 1; + + private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private final AtomicBoolean fatalClockBackwardReported = new AtomicBoolean(false); + private final AtomicLong windowEndTSO = new AtomicLong(0); + + private static final class TSOClockBackwardException extends RuntimeException { + private TSOClockBackwardException(String message) { + super(message); + } + } + + /** + * Constructor initializes the TSO service with update interval + */ + public TSOService() { + super("TSO-service", Config.tso_service_update_interval_ms); + } + + /** + * Start the TSO service. + */ + @Override + public synchronized void start() { + super.start(); + } + + /** + * Periodically update timestamp after catalog is ready + * This method is called by the MasterDaemon framework + */ + @Override + protected void runAfterCatalogReady() { + if (!Config.enable_tso_feature) { + isInitialized.set(false); Review Comment: done ########## fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java: ########## @@ -102,13 +102,15 @@ public final class FeMetaVersion { public static final int VERSION_139 = 139; public static final int VERSION_140 = 140; + // For tso + public static final int VERSION_141 = 141; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_140; + public static final int VERSION_CURRENT = VERSION_141; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... // these clause will be useless and we could remove them - public static final int MINIMUM_VERSION_REQUIRED = VERSION_140; + public static final int MINIMUM_VERSION_REQUIRED = VERSION_141; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
