wuyunfeng commented on a change in pull request #3454: URL: https://github.com/apache/incubator-doris/pull/3454#discussion_r428034718
########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -49,48 +54,103 @@ .readTimeout(10, TimeUnit.SECONDS) .build(); - private String basicAuth; - - private int nextClient = 0; + private Request.Builder builder; private String[] nodes; private String currentNode; + private int currentNodeIndex = 0; public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } public Map<String, EsNodeInfo> getHttpNodes() throws Exception { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map<String, EsNodeInfo> nodes = new HashMap<>(); + Map<String, EsNodeInfo> nodesMap = new HashMap<>(); for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; + } + + public JSONObject getIndexProperties(String indexName, String mappingType) { + String path = indexName + "/_mapping"; + String indexMapping = execute(path); + if (indexMapping == null) { + return null; + } + return parseProperties(indexMapping, mappingType); } - public String getIndexMetaData(String indexName) { - String path = "_cluster/state?indices=" + indexName - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; - return execute(path); + public JSONObject parseProperties(String indexMapping, String mappingType) { + JSONObject jsonObject = new JSONObject(indexMapping); + // the indexName use alias takes the first mapping + Iterator<String> keys = jsonObject.keys(); + String docKey = keys.next(); + JSONObject docData = jsonObject.optJSONObject(docKey); + JSONObject mappings = docData.optJSONObject("mappings"); + JSONObject rootSchema = mappings.optJSONObject(mappingType); + return rootSchema.optJSONObject("properties"); + } + + public EsIndexState getIndexState(String indexName) { + String path = indexName + "/_search_shards"; + String shardLocation = execute(path); + if (shardLocation == null) { + return null; + } + return parseIndexState(indexName, shardLocation); + } + public EsIndexState parseIndexState(String indexName, String shardLocation) { Review comment: ```suggestion private EsIndexState parseIndexState(String indexName, String shardLocation) { ``` ########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -46,55 +52,109 @@ } private static OkHttpClient networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .build(); + .readTimeout(10, TimeUnit.SECONDS) + .build(); - private String basicAuth; - - private int nextClient = 0; + private Request.Builder builder; private String[] nodes; private String currentNode; + private int currentNodeIndex = 0; public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } public Map<String, EsNodeInfo> getHttpNodes() throws Exception { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map<String, EsNodeInfo> nodes = new HashMap<>(); + Map<String, EsNodeInfo> nodesMap = new HashMap<>(); for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; + } + + public JSONObject getIndexProperties(String indexName, String mappingType) { + String path = indexName + "/_mapping"; + String indexMapping = execute(path); + if (indexMapping == null) { + return null; + } + return parseProperties(indexMapping, mappingType); } - public String getIndexMetaData(String indexName) { - String path = "_cluster/state?indices=" + indexName - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; - return execute(path); + public JSONObject parseProperties(String indexMapping, String mappingType) { Review comment: ```suggestion private JSONObject parseProperties(String indexMapping, String mappingType) { ``` ########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -46,55 +52,109 @@ } private static OkHttpClient networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .build(); + .readTimeout(10, TimeUnit.SECONDS) + .build(); - private String basicAuth; - - private int nextClient = 0; + private Request.Builder builder; private String[] nodes; private String currentNode; + private int currentNodeIndex = 0; public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } public Map<String, EsNodeInfo> getHttpNodes() throws Exception { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map<String, EsNodeInfo> nodes = new HashMap<>(); + Map<String, EsNodeInfo> nodesMap = new HashMap<>(); for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; + } + + public JSONObject getIndexProperties(String indexName, String mappingType) { + String path = indexName + "/_mapping"; + String indexMapping = execute(path); + if (indexMapping == null) { + return null; + } + return parseProperties(indexMapping, mappingType); Review comment: ```suggestion return indexMapping == null ? null : parseProperties(indexMapping, mappingType); ``` ########## File path: fe/src/main/java/org/apache/doris/external/EsStateStore.java ########## @@ -17,94 +17,95 @@ package org.apache.doris.external; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.EsTable; -import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.MasterDaemon; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import java.io.IOException; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import okhttp3.Authenticator; -import okhttp3.Call; -import okhttp3.Credentials; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.Route; - /** * it is used to call es api to get shard allocation state */ public class EsStateStore extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(EsStateStore.class); private Map<Long, EsTable> esTables; + private Map<Long, EsRestClient> esClients; + public EsStateStore() { super("es state store", Config.es_state_sync_interval_second * 1000); esTables = Maps.newConcurrentMap(); + esClients = Maps.newConcurrentMap(); } public void registerTable(EsTable esTable) { if (Catalog.isCheckpointThread()) { return; } esTables.put(esTable.getId(), esTable); - LOG.info("register a new table [{}] to sync list", esTable.toString()); + esClients.put(esTable.getId(), + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd())); + LOG.info("register a new table [{}] to sync list", esTable); } public void deRegisterTable(long tableId) { esTables.remove(tableId); + esClients.remove(tableId); LOG.info("deregister table [{}] from sync list", tableId); } - + @Override protected void runAfterCatalogReady() { for (EsTable esTable : esTables.values()) { try { - EsRestClient client = new EsRestClient(esTable.getSeeds(), - esTable.getUserName(), esTable.getPasswd()); - // if user not specify the es version, try to get the remote cluster versoin - // in the future, we maybe need this version - String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); - if (indexMetaData == null) { + EsRestClient client = esClients.get(esTable.getId()); + + if (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable()) { + JSONObject properties = client.getIndexProperties(esTable.getIndexName(), esTable.getMappingType()); Review comment: I think maybe this file should not appear `JSONxxxx` ########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -46,55 +52,109 @@ } private static OkHttpClient networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .build(); + .readTimeout(10, TimeUnit.SECONDS) + .build(); - private String basicAuth; - - private int nextClient = 0; + private Request.Builder builder; private String[] nodes; private String currentNode; + private int currentNodeIndex = 0; public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } public Map<String, EsNodeInfo> getHttpNodes() throws Exception { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map<String, EsNodeInfo> nodes = new HashMap<>(); + Map<String, EsNodeInfo> nodesMap = new HashMap<>(); for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; + } + + public JSONObject getIndexProperties(String indexName, String mappingType) { + String path = indexName + "/_mapping"; + String indexMapping = execute(path); + if (indexMapping == null) { + return null; + } + return parseProperties(indexMapping, mappingType); } - public String getIndexMetaData(String indexName) { - String path = "_cluster/state?indices=" + indexName - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; - return execute(path); + public JSONObject parseProperties(String indexMapping, String mappingType) { + JSONObject jsonObject = new JSONObject(indexMapping); + // the indexName use alias takes the first mapping + Iterator<String> keys = jsonObject.keys(); + String docKey = keys.next(); + JSONObject docData = jsonObject.optJSONObject(docKey); + JSONObject mappings = docData.optJSONObject("mappings"); + JSONObject rootSchema = mappings.optJSONObject(mappingType); + return rootSchema.optJSONObject("properties"); + } + + public EsIndexState getIndexState(String indexName) { + String path = indexName + "/_search_shards"; + String shardLocation = execute(path); + if (shardLocation == null) { + return null; + } + return parseIndexState(indexName, shardLocation); + } + public EsIndexState parseIndexState(String indexName, String shardLocation) { + EsIndexState indexState = new EsIndexState(indexName); + JSONObject jsonObject = new JSONObject(shardLocation); + JSONObject nodesMap = jsonObject.getJSONObject("nodes"); + JSONArray shards = jsonObject.getJSONArray("shards"); + int length = shards.length(); + for (int i = 0; i < length; i++) { + List<EsShardRouting> singleShardRouting = Lists.newArrayList(); + JSONArray shardsArray = shards.getJSONArray(i); + int arrayLength = shardsArray.length(); + for (int j = 0; j < arrayLength; j++) { + JSONObject shard = shardsArray.getJSONObject(j); + String shardState = shard.getString("state"); + if ("STARTED".equalsIgnoreCase(shardState) || "RELOCATING".equalsIgnoreCase(shardState)) { Review comment: relocating should also be taken into account. ########## File path: fe/src/main/java/org/apache/doris/external/EsStateStore.java ########## @@ -17,94 +17,95 @@ package org.apache.doris.external; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.EsTable; -import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.MasterDaemon; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import java.io.IOException; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import okhttp3.Authenticator; -import okhttp3.Call; -import okhttp3.Credentials; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.Route; - /** * it is used to call es api to get shard allocation state */ public class EsStateStore extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(EsStateStore.class); private Map<Long, EsTable> esTables; + private Map<Long, EsRestClient> esClients; + public EsStateStore() { super("es state store", Config.es_state_sync_interval_second * 1000); esTables = Maps.newConcurrentMap(); + esClients = Maps.newConcurrentMap(); } public void registerTable(EsTable esTable) { if (Catalog.isCheckpointThread()) { return; } esTables.put(esTable.getId(), esTable); - LOG.info("register a new table [{}] to sync list", esTable.toString()); + esClients.put(esTable.getId(), + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd())); + LOG.info("register a new table [{}] to sync list", esTable); } public void deRegisterTable(long tableId) { esTables.remove(tableId); + esClients.remove(tableId); LOG.info("deregister table [{}] from sync list", tableId); } - + @Override protected void runAfterCatalogReady() { for (EsTable esTable : esTables.values()) { try { - EsRestClient client = new EsRestClient(esTable.getSeeds(), - esTable.getUserName(), esTable.getPasswd()); - // if user not specify the es version, try to get the remote cluster versoin - // in the future, we maybe need this version - String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); - if (indexMetaData == null) { + EsRestClient client = esClients.get(esTable.getId()); + + if (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable()) { + JSONObject properties = client.getIndexProperties(esTable.getIndexName(), esTable.getMappingType()); + if (properties == null) { + continue; + } + setEsTableContext(properties, esTable); + } + + EsIndexState esIndexState = client.getIndexState(esTable.getIndexName()); + if (esIndexState == null) { continue; } - EsTableState esTableState = parseClusterState55(indexMetaData, esTable); + + EsTableState esTableState = setTableStatePartitionInfo(esTable, esIndexState); Review comment: `setTableStatePartitionInfo` is so long and maybe not reflect the meaning of this func. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org