BePPPower opened a new issue, #17974:
URL: https://github.com/apache/doris/issues/17974

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Description
   
   This is a tutorial on how to add a new Metadata TVF.  This tutorial takes 
`backends` TVF as an example.
   The implemention of `backends` TVF is in pr #17667.
   # Concepts
   1. What  is table-valued-function(tvf)?
   #10217
   2. What is Metadata tvf?
   As mentioned above, a TVF is a function that can be used in the 'from' 
clause. A Metadata TVF is a TVF that BE needs to get data from FE.
   
   # Introduction
   The sql `show backends` can return the information of backends in the 
cluster. Now we need implement a TVF called `backend()` which can return the 
same result as sql `show backends`.
   Because `show` statements can not be used with `where` or `join` clauses, we 
can use `backends` TVF for analysis. 
   
   The result of `show backends` :
   ```sql
   mysql> show backends\G
   *************************** 1. row ***************************
                 BackendId: 10003
                   Cluster: default_cluster
                        IP: 172.21.0.101
             HeartbeatPort: 9159
                    BePort: 9169
                  HttpPort: 8149
                  BrpcPort: 8169
             LastStartTime: 2023-03-09 11:21:16
             LastHeartbeat: 2023-03-09 11:34:04
                     Alive: true
      SystemDecommissioned: false
     ClusterDecommissioned: false
                 TabletNum: 21
          DataUsedCapacity: 0.000
             AvailCapacity: 193.080 GB
             TotalCapacity: 502.964 GB
                   UsedPct: 61.61 %
            MaxDiskUsedPct: 61.61 %
        RemoteUsedCapacity: 0.000
                       Tag: {"location" : "default"}
                    ErrMsg:
                   Version: doris-0.0.0-trunk-dee41d106b
                    Status: {"lastSuccessReportTabletsTime":"2023-03-09 
11:33:58","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false}
   HeartbeatFailureCounter: 0
                  NodeRole: mix
   1 row in set (0.00 sec)
   ```
   
   We hope `backends` TVF gets the same result. Param is `cluster_name`:
   ```sql
   mysql> select * from backends("cluster_name" = "default")\G
   *************************** 1. row ***************************
                 BackendId: 10003
                   Cluster: default_cluster
                        IP: 172.21.0.101
             HeartbeatPort: 9159
                    BePort: 9169
                  HttpPort: 8149
                  BrpcPort: 8169
             LastStartTime: 2023-03-09 11:21:16
             LastHeartbeat: 2023-03-09 11:34:04
                     Alive: true
      SystemDecommissioned: false
     ClusterDecommissioned: false
                 TabletNum: 21
          DataUsedCapacity: 0.000
             AvailCapacity: 193.080 GB
             TotalCapacity: 502.964 GB
                   UsedPct: 61.61 %
            MaxDiskUsedPct: 61.61 %
        RemoteUsedCapacity: 0.000
                       Tag: {"location" : "default"}
                    ErrMsg:
                   Version: doris-0.0.0-trunk-dee41d106b
                    Status: {"lastSuccessReportTabletsTime":"2023-03-09 
11:33:58","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false}
   HeartbeatFailureCounter: 0
                  NodeRole: mix
   1 row in set (0.00 sec)
   ```
   
   The information of  Doris's backends  is stored in Fe, but the execution of 
the `Select` query plan is responsible for BE. So when BE executes the` SELECT 
* FROM backends` query plan, BE needs to send RPC requests to Fe, request FE to 
transmit information related to Backends to own. The entire process is as 
follows:
   
![image](https://user-images.githubusercontent.com/43782773/226370947-f4029640-73da-41e2-8cf0-00249c7e5e66.png)
   
   # Steps
   ## Modify thrift
   1. Types.thrift
   
   `TMetadataType` add a new type: BACKENDS
   ```
   enum TMetadataType {
     ICEBERG,
     BACKENDS
   }
   ```
   
   2. PlanNodes.thrift
   
   Add a new thrift struct called `TBackendMetadataParams`, contains a param 
called `cluster_name`.
   ```
   struct TBackendMetadataParams {
     1: optional string cluster_name
   }
   ```
   
   Add the structure to `TMetaScanRange`:
   ```
   struct TMetaScanRange {
     1: optional Types.TMetadataType metadata_type
     2: optional TIcebergMetadataParams iceberg_params
     3: optional TBackendMetadataParams backends_params
   }
   ```
   
   3. FrontendService.thrift
   
   Add the structure to `TMetadataTableRequestParams`:
   ```
   struct TMetadataTableRequestParams {
     1: optional Types.TMetadataType metadata_type
     2: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params
     3: optional PlanNodes.TBackendMetadataParams backends_metadata_params
   }
   ```
   
   ## FE code 1
   Create a new class called `BackendsTableValuedFunction`, extends from 
`MetadataTableValuedFunction`:
   
   ```java
   public class BackendsTableValuedFunction extends MetadataTableValuedFunction 
{
   
       public BackendsTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
       }
   
       @Override
       public TMetadataType getMetadataType() {}
   
       @Override
       public TMetaScanRange getMetaScanRange() {}
   
       @Override
       public String getTableName() {}
   
       @Override
       public List<Column> getTableColumns() throws AnalysisException {}
   }
   ```
   
   `BackendsTableValuedFunction` need to implement its own constructor and 
override the four superclass abstract methods:
   1. constructor:
     The params `Map<String, String>` params  are the parameter pairs sent by 
the client. The parameters can be judged and processed in the constructor.
   2. `getMetadataType`
     Return the `MetadataType` of this TVF. That's the new type in 
`TMetadataType`.
   3. `getTableName` 
   Return the name of this TVF.
   4. `getMetaScanRange`
     Set the parameters of FE brought to BE in `TMetaScanRange`.
   5. `getTableColumns` 
     We need to define the column definition of the TVF in this method.
   
   After writing `BackendsTableValuedFunction`, register it in 
`TableValuedFunctionIf`:
   ```java
   public static TableValuedFunctionIf getTableFunction(String funcName, 
Map<String, String> params)
                                                       throws AnalysisException 
{
       switch (funcName.toLowerCase()) {
           case NumbersTableValuedFunction.NAME:
               return new NumbersTableValuedFunction(params);
           case S3TableValuedFunction.NAME:
               return new S3TableValuedFunction(params);
           case HdfsTableValuedFunction.NAME:
               return new HdfsTableValuedFunction(params);
           case IcebergTableValuedFunction.NAME:
               return new IcebergTableValuedFunction(params);
           case BackendsTableValuedFunction.NAME:
               return new BackendsTableValuedFunction(params);
           default:
               throw new AnalysisException("Could not find table function " + 
funcName);
       }
   }
   ```
   
   ## BE code 1
   We need to add the `_build_backends_metadata_request` method in 
`VMetaScanner` class. This method encapsulates the request sent to Fe.
   ```c++
   // vmeta_scanner.h
   Status _build_backends_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                           TFetchSchemaTableDataRequest* 
request);
   
   // vmeta_scanner.cpp
   // Mainly set request parameters
   Status VMetaScanner::_build_backends_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                         
TFetchSchemaTableDataRequest* request) {
       VLOG_CRITICAL << "VMetaScanner::_build_backends_metadata_request";
       if (!meta_scan_range.__isset.backends_params) {
           return Status::InternalError("Can not find TBackendsMetadataParams 
from meta_scan_range.");
       }
       // create request
       request->__set_cluster_name("");
       request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
   
       // create TMetadataTableRequestParams
       TMetadataTableRequestParams metadata_table_params;
       metadata_table_params.__set_metadata_type(TMetadataType::BACKENDS);
       
metadata_table_params.__set_backends_metadata_params(meta_scan_range.backends_params);
   
       request->__set_metada_table_params(metadata_table_params);
       return Status::OK();
   }
   ```
   
   Register the above method in `_fetch_metadata` method:
   
   ```c++
   // vmeta_scanner.cpp
   Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
       VLOG_CRITICAL << "VMetaScanner::_fetch_metadata";
       TFetchSchemaTableDataRequest request;
       switch (meta_scan_range.metadata_type) {
       case TMetadataType::ICEBERG:
           RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, 
&request));
           break;
       case TMetadataType::BACKENDS:
           RETURN_IF_ERROR(_build_backends_metadata_request(meta_scan_range, 
&request));
           break;
       default:
           _meta_eos = true;
           return Status::OK();
       }
   
       ....
       
       return Status::OK();
   }
   ```
   
   ## FE code 2
   After receiving BE's RPC request, the processing entrance is in 
`MetadataGenerator#getMetadataTable`.
   We need to implement a method to return the information of backends here.
   
   ```java
   public static TFetchSchemaTableDataResult 
getMetadataTable(TFetchSchemaTableDataRequest request) {
       if (!request.isSetMetadaTableParams()) {
           return errorResult("Metadata table params is not set. ");
       }
       switch (request.getMetadaTableParams().getMetadataType()) {
           case ICEBERG:
               return icebergMetadataResult(request.getMetadaTableParams());
           case BACKENDS:
               return getBackendsSchemaTable(request);
           default:
               break;
       }
       return errorResult("Metadata table params is not set. ");
   }
   
   public static TFetchSchemaTableDataResult 
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
       final SystemInfoService clusterInfoService = Env.getCurrentSystemInfo();
       List<Long> backendIds = null;
       if (!Strings.isNullOrEmpty(request.cluster_name)) {
           final Cluster cluster = 
Env.getCurrentEnv().getCluster(request.cluster_name);
           // root not in any cluster
           if (null == cluster) {
               return errorResult("Cluster is not existed.");
           }
           backendIds = cluster.getBackendIdList();
       } else {
           backendIds = clusterInfoService.getBackendIds(false);
       }
   
       TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
       
       ....
   
       result.setDataBatch(dataBatch);
       result.setStatus(new TStatus(TStatusCode.OK));
       return result;
   }
   ```
   
   ## BE code 2
   After receiving the data result from  Fe, the data will be processed by 
`_fill_block_with_remote_data` method. You may need to add the corresponding 
type handling here.
   ```C++
   Status VMetaScanner::_fill_block_with_remote_data(const 
std::vector<MutableColumnPtr>& columns) {
       VLOG_CRITICAL << "VMetaScanner::_fill_block_with_remote_data";
       for (int col_idx = 0; col_idx < columns.size(); col_idx++) {
           auto slot_desc = _tuple_desc->slots()[col_idx];
           // because the fe planner filter the non_materialize column
           if (!slot_desc->is_materialized()) {
               continue;
           }
   
           for (int _row_idx = 0; _row_idx < _batch_data.size(); _row_idx++) {
               vectorized::IColumn* col_ptr = columns[col_idx].get();
               if (slot_desc->is_nullable() == true) {
                   auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
                   col_ptr = &nullable_column->get_nested_column();
               }
               switch (slot_desc->type().type) {
               case TYPE_INT: {
                   int64_t data = 
_batch_data[_row_idx].column_value[col_idx].intVal;
                   
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
                           ->insert_value(data);
                   break;
               }
               case TYPE_BIGINT: {
                   int64_t data = 
_batch_data[_row_idx].column_value[col_idx].longVal;
                   
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
                           ->insert_value(data);
                   break;
               }
               case TYPE_FLOAT: {
                   double data = 
_batch_data[_row_idx].column_value[col_idx].doubleVal;
                   
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)
                           ->insert_value(data);
                   break;
               }
               case TYPE_DOUBLE: {
                   double data = 
_batch_data[_row_idx].column_value[col_idx].doubleVal;
                   
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)
                           ->insert_value(data);
                   break;
               }
               case TYPE_DATETIMEV2: {
                   uint64_t data = 
_batch_data[_row_idx].column_value[col_idx].longVal;
                   
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)
                           ->insert_value(data);
                   break;
               }
               case TYPE_STRING:
               case TYPE_CHAR:
               case TYPE_VARCHAR: {
                   std::string data = 
_batch_data[_row_idx].column_value[col_idx].stringVal;
                   
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data.c_str(),
                                                                                
     data.length());
                   break;
               }
               default: {
                   std::string error_msg =
                           fmt::format("Invalid column type {} on column: {}.",
                                       slot_desc->type().debug_string(), 
slot_desc->col_name());
                   return Status::InternalError(std::string(error_msg));
               }
               }
           }
       }
       _meta_eos = true;
       return Status::OK();
   }
   ```
   
   ### Solution
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to