Chen768959 commented on code in PR #55274: URL: https://github.com/apache/doris/pull/55274#discussion_r2376374440
########## fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisRestClient.java: ########## @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.doris; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.util.JsonUtil; +import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.HealthAction; +import org.apache.doris.httpv2.rest.RestApiStatusCode; +import org.apache.doris.httpv2.rest.response.GsonSchemaResponse; +import org.apache.doris.persist.gson.GsonUtils; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.gson.reflect.TypeToken; +import okhttp3.Credentials; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.http.HttpHeaders; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.util.Strings; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +/** + * Use this restClient when the remote Doris cluster is the same version as the current cluster, + * ensuring complete tableSchema compatibility. + */ +public class RemoteDorisRestClient { + private static final Logger LOG = LogManager.getLogger(RemoteDorisRestClient.class); + + private static final OkHttpClient networkClient = new OkHttpClient + .Builder().readTimeout(10, TimeUnit.SECONDS).build(); + + private static OkHttpClient sslNetworkClient; + private final Request.Builder builder; + private final List<String> feNodes; + private String currentNode; + private int currentNodeIndex = 0; + private final boolean httpSslEnable; + + /** + * For DorisTable. + **/ + public RemoteDorisRestClient(List<String> feNodes, String authUser, String authPassword, boolean httpSslEnable) { + this.feNodes = feNodes; + this.builder = new Request.Builder(); + if (!Strings.isEmpty(authUser)) { + this.builder.addHeader(HttpHeaders.AUTHORIZATION, + Credentials.basic(authUser, Strings.isEmpty(authPassword) ? "" : authPassword)); + } + this.currentNode = feNodes.get(currentNodeIndex); + this.httpSslEnable = httpSslEnable; + } + + public List<String> getDatabaseNameList() { + return parseStringLists(execute("api/meta/namespaces/default_cluster/databases")); + } + + public List<String> getTablesNameList(String dbName) { + return parseStringLists(execute("api/meta/namespaces/default_cluster/databases/" + dbName + "/tables")); + } + + public boolean isTableExist(String dbName, String tableName) { + return parseSuccessResponse(execute("api/" + dbName + "/" + tableName + "/_schema")); + } + + public boolean health() { + int aliveBeNum = parseOnlineBeNum(execute("api/health")); + return aliveBeNum > 0; + } + + public List<Column> getColumns(String dbName, String tableName) { + return parseColumns(execute("api/" + dbName + "/" + tableName + "/_gson_schema")); + } + + public long getRowCount(String dbName, String tableName) { + return parseRowCount(execute("api/rowcount?db=" + dbName + "&table=" + tableName)); + } + + public static List<String> parseStringLists(String executeResult) { + ResponseBody<ArrayList<String>> databasesResponse = parseResponse( + new TypeToken<ArrayList<String>>() {}, + executeResult); + if (successResponse(databasesResponse)) { + return databasesResponse.getData(); + } + return new ArrayList<>(); + } + + public static boolean parseSuccessResponse(String executeResult) { + ObjectNode objectNode = JsonUtil.parseObject(executeResult); + Integer code = JsonUtil.safeGetAsInt(objectNode, "code"); + return code != null && code == RestApiStatusCode.OK.code; + } + + public static int parseOnlineBeNum(String executeResult) { + ResponseBody<HashMap<String, Integer>> healthResponse = parseResponse( + new TypeToken<HashMap<String, Integer>>() {}, + executeResult); + if (successResponse(healthResponse)) { + return healthResponse.getData().get(HealthAction.ONLINE_BACKEND_NUM); + } + throw new RuntimeException("get doris table schema error, msg: " + healthResponse.getMsg()); + } + + public static List<Column> parseColumns(String executeResult) { + ResponseBody<GsonSchemaResponse> getColumnsResponse = parseResponse( + new TypeToken<GsonSchemaResponse>(){}, + executeResult); + if (successResponse(getColumnsResponse)) { + return getColumnsResponse.getData().getJsonColumns().stream() + .map(json -> GsonUtils.GSON.fromJson(json, Column.class)) + .collect(Collectors.toList()); + } + throw new RuntimeException("get doris table schema error, msg: " + getColumnsResponse.getMsg()); + } + + public static long parseRowCount(String executeResult) { + ResponseBody<HashMap<String, Long>> rowCountResponse = parseResponse( + new TypeToken<HashMap<String, Long>>() {}, + executeResult); + if (successResponse(rowCountResponse)) { + return rowCountResponse.getData().values().iterator().next(); + } + throw new RuntimeException("get doris table row count error, msg: " + rowCountResponse.getMsg()); + } + + private void selectNextNode() { + currentNodeIndex++; + currentNodeIndex = currentNodeIndex % feNodes.size(); + currentNode = feNodes.get(currentNodeIndex); + } + + public OkHttpClient getClient() { + if (httpSslEnable) { + return getOrCreateSslNetworkClient(); + } + return networkClient; + } + + /** + * init ssl networkClient use lazy way + **/ + private synchronized OkHttpClient getOrCreateSslNetworkClient() { + if (sslNetworkClient == null) { + sslNetworkClient = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) + .hostnameVerifier(new RemoteDorisRestClient.TrustAllHostnameVerifier()).build(); + } + return sslNetworkClient; + } + + private Response executeResponse(OkHttpClient httpClient, String path) throws IOException { + currentNode = currentNode.trim(); + if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) { + currentNode = "http://" + currentNode; + } + if (!currentNode.endsWith("/")) { + currentNode = currentNode + "/"; + } + Request request = builder.get().url(currentNode + path).build(); + if (LOG.isInfoEnabled()) { + LOG.info("doris rest client request URL: {}", request.url().toString()); + } + return httpClient.newCall(request).execute(); + } + + /** + * execute request for specific path,it will try again nodes.length times if it fails + * + * @param path the path must not leading with '/' + * @return response + */ + protected String execute(String path) { + // try 3 times for every node + int retrySize = feNodes.size() * 3; Review Comment: Changed to the catalog attribute variable. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java: ########## @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.doris.source; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TRemoteDorisFileDesc; +import org.apache.doris.thrift.TTableFormatFileDesc; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.grpc.CredentialCallOption; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class RemoteDorisScanNode extends FileQueryScanNode { + private static final Logger LOG = LogManager.getLogger(RemoteDorisScanNode.class); + + private static final int ADBC_EXEC_RETRY = 3; + + private final List<String> columns = new ArrayList<String>(); + private final List<String> filters = new ArrayList<String>(); + + private RemoteDorisSource source; + + public RemoteDorisScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + SessionVariable sv) { + super(id, desc, "REMOTE_DORIS_SCAN_NODE", StatisticalType.REMOTE_DORIS_SCAN_NODE, needCheckColumnPriv, sv); + } + + @Override + protected void doInitialize() throws UserException { + super.doInitialize(); + source = new RemoteDorisSource(desc); + } + + @Override + public List<Split> getSplits(int numBackends) throws UserException { + List<Pair<String, ByteBuffer>> locationAndTicketList = executeAdbcQuery(); + + List<Split> splits = Lists.newArrayList(); + + for (Pair<String, ByteBuffer> locationAndTicket : locationAndTicketList) { + splits.add(new RemoteDorisSplit(locationAndTicket.first, locationAndTicket.second)); + } + + return splits; + } + + @Override + protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { + if (split instanceof RemoteDorisSplit) { + RemoteDorisSplit dorisArrowSplit = (RemoteDorisSplit) split; + TRemoteDorisFileDesc fileDesc = new TRemoteDorisFileDesc(); + fileDesc.setIp(source.getHostAndArrowPort().key()); + fileDesc.setArrowPort(source.getHostAndArrowPort().value().toString()); + fileDesc.setTicket(dorisArrowSplit.getTicket()); + fileDesc.setLocationUri(dorisArrowSplit.getLocation()); + fileDesc.setUser(source.getCatalog().getUsername()); + fileDesc.setPassword(source.getCatalog().getPassword()); + + // set TTableFormatFileDesc + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setRemoteDorisParams(fileDesc); + tableFormatFileDesc.setTableFormatType(((RemoteDorisSplit) split).getTableFormatType().value()); + + // set TFileRangeDesc + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + + output.append(prefix).append("TABLE: ").append(source.getTargetTable().getExternalTableName()).append("\n"); + if (detailLevel == TExplainLevel.BRIEF) { + return output.toString(); + } + output.append(prefix).append("QUERY: ").append(getAdbcQueryStr()).append("\n"); + if (!conjuncts.isEmpty()) { + Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); + output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); + } + + return output.toString(); + } + + @Override + protected TFileFormatType getFileFormatType() throws UserException { + return TFileFormatType.FORMAT_ARROW; + } + + @Override + protected List<String> getPathPartitionKeys() throws UserException { + return new ArrayList<>(); + } + + @Override + protected TableIf getTargetTable() throws UserException { + return desc.getTable(); + } + + @Override + protected Map<String, String> getLocationProperties() throws UserException { + return source.getCatalog().getProperties(); + } + + // Send an adbc request and receive the response result from adbc + private List<Pair<String, ByteBuffer>> executeAdbcQuery() { + createAdbcColumns(); + createAdbcFilters(); + + String queryStr = getAdbcQueryStr(); + RuntimeException scratchExceptionForThrow = null; + + for (int i = 0; i < ADBC_EXEC_RETRY; i++) { + try { + return getUriAndTickets( + source.nextHostAndArrowPort(), + source.getCatalog().getUsername(), + source.getCatalog().getPassword(), + queryStr + ); + } catch (Exception e) { + LOG.warn("arrow request node [{}] failures {}, try next nodes", + source.getHostAndArrowPort().toString(), e); + scratchExceptionForThrow = new RuntimeException(e.getMessage()); + } + } + + LOG.error("try all arrow nodes [{}], no other nodes left, sql:{}", source.getHostAndArrowPort(), queryStr); + throw scratchExceptionForThrow; + } + + public static List<Pair<String, ByteBuffer>> getUriAndTickets(Pair<String, Integer> hostAndPort, + String user, String psw, String sql) throws Exception { + List<Pair<String, ByteBuffer>> uniquePairs = new ArrayList<>(); + FlightClient clientFE = null; + FlightSqlClient sqlClientFE = null; + try { + BufferAllocator allocatorFE = new RootAllocator(Integer.MAX_VALUE); + final Location clientLocationFE = new Location( + new URI("grpc", null, hostAndPort.first, + hostAndPort.second, null, null, null) + ); + + clientFE = FlightClient.builder(allocatorFE, clientLocationFE).build(); + sqlClientFE = new FlightSqlClient(clientFE); + + CredentialCallOption credentialCallOption = clientFE.authenticateBasicToken(user, psw).get(); + FlightSqlClient.PreparedStatement preparedStatement = sqlClientFE.prepare(sql, credentialCallOption); + FlightInfo info = preparedStatement.execute(credentialCallOption); + List<FlightEndpoint> endpoints = info.getEndpoints(); + + Set<String> seenPairs = new HashSet<>(); + for (FlightEndpoint endpoint : endpoints) { + ByteBuffer ticket = endpoint.getTicket().serialize(); + String ticketStr = ticket.toString(); + + for (Location location : endpoint.getLocations()) { + String uri = location.getUri().toString(); + String compositeKey = ticketStr + "|" + uri; + + if (seenPairs.add(compositeKey)) { + uniquePairs.add(Pair.of(uri, ticket)); + } + } + } + } finally { + if (sqlClientFE != null) { + sqlClientFE.close(); + } + if (clientFE != null) { + clientFE.close(); + } + } + + return uniquePairs; + } + + private void createAdbcColumns() { + columns.clear(); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + Column col = slot.getColumn(); + columns.add("`" + col.getName() + "`"); + } + if (columns.isEmpty()) { + columns.add("*"); + } + } + + private String getAdbcQueryStr() { + StringBuilder sql = new StringBuilder("SELECT "); + + if (source.getCatalog().enableParallelResultSink()) { + sql.append("/*+ SET_VAR(enable_parallel_result_sink=true) */ "); + } + + sql.append(Joiner.on(", ").join(columns)); + + sql.append(" FROM ").append(source.getTargetTable().getExternalTableName()); + + if (!filters.isEmpty()) { + sql.append(" WHERE ("); + sql.append(Joiner.on(") AND (").join(filters)); + sql.append(")"); + } + + if (limit != -1) { + sql.append(" LIMIT ").append(limit); + } + + return sql.toString(); + } + + private void createAdbcFilters() { + if (conjuncts.isEmpty()) { + return; + } + + List<SlotRef> slotRefs = Lists.newArrayList(); + Expr.collectList(conjuncts, SlotRef.class, slotRefs); + ExprSubstitutionMap sMap = new ExprSubstitutionMap(); + for (SlotRef slotRef : slotRefs) { + SlotRef slotRef1 = (SlotRef) slotRef.clone(); + slotRef1.setTblName(null); + slotRef1.setLabel("`" + slotRef1.getColumnName() + "`"); + sMap.put(slotRef, slotRef1); + } + + ArrayList<Expr> conjunctsList = Expr.cloneList(conjuncts, sMap); + for (Expr expr : conjunctsList) { + String filter = conjunctExprToString(expr, desc.getTable()); + filters.add(filter); + } + } + + private String conjunctExprToString(Expr expr, TableIf tbl) { + if (expr.contains(DateLiteral.class) && expr instanceof BinaryPredicate) { + ArrayList<Expr> children = expr.getChildren(); + String filter = children.get(0).toExternalSql(TableIf.TableType.DORIS_EXTERNAL_TABLE, tbl); + filter += " " + ((BinaryPredicate) expr).getOp().toString() + " "; + + filter += children.get(1).toExternalSql(TableIf.TableType.DORIS_EXTERNAL_TABLE, tbl); + + return filter; + } + + // Only for old planner + if (expr.contains(BoolLiteral.class) && "1".equals(expr.getStringValue()) && expr.getChildren().isEmpty()) { + return "1 = 1"; + } Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
