kaori-seasons commented on PR #1923:
URL: https://github.com/apache/fluss/pull/1923#issuecomment-3514923872

   
   
   @agoncharuk Regarding your point about "Fluss connectors not being able to 
know in advance whether historical data is needed during planning," I've 
thought about it for a long time. In reality, it's possible, but I understand 
your concerns. I've added a method to FlussMetadata that returns the table's 
time boundary information and Lakehouse configuration. This will allow the 
Trino planner to make more informed decisions during planning.
   
   ### 1. Indeed Determine Whether Historical Data Is Needed at Planning Time
   
   From code implementation, we can see that we can make this determination at 
the planning stage:
   
   ```java
   // In FlussUnionReadManager
   public UnionReadStrategy determineStrategy(FlussTableHandle tableHandle) {
       // 1. First check if Union Read is applicable
       if (!isUnionReadApplicable(tableHandle)) {
           return UnionReadStrategy.REAL_TIME_ONLY;
       }
   
       // 2. Check if only historical data is needed
       if (isHistoricalOnlyQuery(tableHandle)) {
           return UnionReadStrategy.HISTORICAL_ONLY;
       }
       
       // 3. Check if only real-time data is needed
       if (isRealTimeOnlyQuery(tableHandle)) {
           return UnionReadStrategy.REAL_TIME_ONLY;
       }
       
       // 4. Perform complex analysis to determine optimal strategy
       return analyzeQueryForOptimalStrategy(tableHandle);
   }
   ```
   
   ### 2. Decision Basis Includes Multiple Dimensions
   
   We can determine whether historical data is needed at the planning stage 
based on the following information:
   
   #### a) Table Configuration Information
   ```java
   // Check if the table has Lakehouse configuration
   Optional<String> lakehouseFormat = tableInfo.getTableDescriptor()
           .getCustomProperties()
           .map(props -> props.get("datalake.format"));
   ```
   
   #### b) Query Constraints
   ```java
   // Analyze query predicates
   private PredicateAnalysisResult analyzePredicates(FlussTableHandle 
tableHandle) {
       // Analyze time-related predicates
       // For example: WHERE date < '2024-01-01' may only need historical data
       // For example: WHERE date > '2024-06-01' may only need real-time data
   }
   ```
   
   #### c) Time Boundary Information
   ```java
   // Get the time boundary between real-time and historical data
   public Optional<Long> getTimeBoundary(FlussTableHandle tableHandle) {
       // Get Lakehouse sync timestamp
       Optional<String> syncTimestamp = tableInfo.getTableDescriptor()
               .getCustomProperties()
               .flatMap(props -> 
Optional.ofNullable(props.get("lakehouse.sync.timestamp")));
   }
   ```
   
   #### d) Query Limit Conditions
   ```java
   // Analyze LIMIT conditions
   private double analyzeLimitBenefit(FlussTableHandle tableHandle) {
       if (tableHandle.getLimit().isPresent()) {
           long limit = tableHandle.getLimit().get();
           // Small LIMIT may be better suited for real-time data (lower 
latency)
           if (limit <= 100) {
               return 0.9; // High benefit for real-time data
           }
       }
   }
   ```
   
   ### 3. Strategies for Handling "Cannot Know in Advance" Situations
   
   For situations where we truly cannot determine at planning time, we employ 
the following strategies:
   
   #### a) Default Strategy
   ```java
   // When uncertain, default to UNION strategy
   // This ensures data completeness
   private UnionReadStrategy determineOptimalStrategy(FlussTableHandle 
tableHandle, 
                                                    StrategyAnalysisResult 
analysis) {
       double realTimeScore = analysis.getRealTimeBenefit();
       double historicalScore = analysis.getHistoricalBenefit();
       
       // If scores are very close, use UNION strategy to ensure data 
completeness
       if (Math.abs(realTimeScore - historicalScore) < 0.1) {
           return UnionReadStrategy.UNION;
       }
       
       // Otherwise, choose the strategy with the higher score
       return realTimeScore > historicalScore ? 
           UnionReadStrategy.REAL_TIME_ONLY : 
           UnionReadStrategy.HISTORICAL_ONLY;
   }
   ```
   
   #### b) Runtime Validation and Adjustment
   ```java
   // Validate and adjust strategy during execution
   private UnionReadStrategy validateAndAdjustStrategy(
           FlussTableHandle tableHandle,
           UnionReadStrategy strategy,
           StrategyAnalysisResult analysis) {
       
       // Verify if historical data is actually available
       if (strategy == UnionReadStrategy.HISTORICAL_ONLY) {
           if (!isHistoricalDataAvailable(tableHandle)) {
               // If historical data is not available, switch to real-time data
               return UnionReadStrategy.REAL_TIME_ONLY;
           }
       }
       
       return strategy;
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to