gong commented on code in PR #7764: URL: https://github.com/apache/inlong/pull/7764#discussion_r1261916925
########## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.schema; + +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; +import org.apache.inlong.sort.base.schema.SchemaChangeHandleException; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; +import org.apache.inlong.sort.doris.http.HttpGetEntity; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation; +import org.apache.inlong.sort.protocol.ddl.operations.Operation; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; +import org.apache.inlong.sort.util.SchemaChangeUtils; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.util.Preconditions; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringJoiner; + +/** + * Schema change helper + */ +public class SchemaChangeHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeHelper.class); + + private static final String CHECK_LIGHT_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final String DORIS_HTTP_CALL_SUCCESS = "0"; + private static final String CONTENT_TYPE_JSON = "application/json"; + private final boolean schemaChange; + private final Map<SchemaChangeType, SchemaChangePolicy> policyMap; + private final DorisOptions options; + private final JsonDynamicSchemaFormat dynamicSchemaFormat; + private final String databasePattern; + private final String tablePattern; + private final int maxRetries; + private final OperationHelper operationHelper; + private final SchemaUpdateExceptionPolicy exceptionPolicy; + private final SinkTableMetricData metricData; + private final DirtySinkHelper<Object> dirtySinkHelper; + + private SchemaChangeHelper(JsonDynamicSchemaFormat dynamicSchemaFormat, DorisOptions options, boolean schemaChange, + Map<SchemaChangeType, SchemaChangePolicy> policyMap, String databasePattern, String tablePattern, + int maxRetries, SchemaUpdateExceptionPolicy exceptionPolicy, + SinkTableMetricData metricData, DirtySinkHelper<Object> dirtySinkHelper) { + this.dynamicSchemaFormat = Preconditions.checkNotNull(dynamicSchemaFormat, "dynamicSchemaFormat is null"); + this.options = Preconditions.checkNotNull(options, "doris options is null"); + this.schemaChange = schemaChange; + this.policyMap = policyMap; + this.databasePattern = databasePattern; + this.tablePattern = tablePattern; + this.maxRetries = maxRetries; + this.exceptionPolicy = exceptionPolicy; + this.metricData = metricData; + this.dirtySinkHelper = dirtySinkHelper; + operationHelper = OperationHelper.of(dynamicSchemaFormat); + } + + public static SchemaChangeHelper of(JsonDynamicSchemaFormat dynamicSchemaFormat, DorisOptions options, + boolean schemaChange, Map<SchemaChangeType, SchemaChangePolicy> policyMap, String databasePattern, + String tablePattern, int maxRetries, SchemaUpdateExceptionPolicy exceptionPolicy, + SinkTableMetricData metricData, DirtySinkHelper<Object> dirtySinkHelper) { + return new SchemaChangeHelper(dynamicSchemaFormat, options, schemaChange, policyMap, databasePattern, + tablePattern, maxRetries, exceptionPolicy, metricData, dirtySinkHelper); + } + + /** + * Process schema change for Doris + * + * @param data The origin data + */ + public void process(byte[] originData, JsonNode data) { + if (!schemaChange) { + return; + } + String database; + String table; + try { + database = dynamicSchemaFormat.parse(data, databasePattern); + table = dynamicSchemaFormat.parse(data, tablePattern); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Parse database, table from origin data failed, origin data: %s", + new String(originData)), + e); + } + LOGGER.warn("Parse database, table from origin data failed, origin data: {}", new String(originData), e); + if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) { + dirtySinkHelper.invoke(new String(originData), DirtyType.JSON_PROCESS_ERROR, e); + } + if (metricData != null) { + metricData.invokeDirty(1, originData.length); + } + return; + } + Operation operation; + try { + JsonNode operationNode = Preconditions.checkNotNull(data.get("operation"), + "Operation node is null"); + operation = Preconditions.checkNotNull( + dynamicSchemaFormat.objectMapper.convertValue(operationNode, new TypeReference<Operation>() { + }), "Operation is null"); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Extract Operation from origin data failed,origin data: %s", data), e); + } + LOGGER.warn("Extract Operation from origin data failed,origin data: {}", data, e); + handleDirtyData(data, originData, database, table, DirtyType.JSON_PROCESS_ERROR, e); + return; + } + String originSchema = dynamicSchemaFormat.extractDDL(data); + SchemaChangeType type = SchemaChangeUtils.extractSchemaChangeType(operation); + if (type == null) { + LOGGER.warn("Unsupported for schema-change: {}", originSchema); + return; + } + switch (type) { + case ALTER: + handleAlterOperation(database, table, originData, originSchema, data, (AlterOperation) operation); + break; + case CREATE_TABLE: + doCreateTable(originData, database, table, type, originSchema, data, (CreateTableOperation) operation); + break; + case DROP_TABLE: + doDropTable(type, originSchema); + break; + case RENAME_TABLE: + doRenameTable(type, originSchema); + break; + case TRUNCATE_TABLE: + doTruncateTable(type, originSchema); + break; + default: + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + } + } + + private void handleDirtyData(JsonNode data, byte[] originData, String database, + String table, DirtyType dirtyType, Throwable e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) { + String label = parseValue(data, dirtySinkHelper.getDirtyOptions().getLabels()); + String logTag = parseValue(data, dirtySinkHelper.getDirtyOptions().getLogTag()); + String identifier = parseValue(data, dirtySinkHelper.getDirtyOptions().getIdentifier()); + dirtySinkHelper.invoke(new String(originData), dirtyType, label, logTag, identifier, e); + } + if (metricData != null) { + metricData.outputDirtyMetricsWithEstimate(database, table, 1, originData.length); + } + } + + private void reportMetric(String database, String table, int len) { + if (metricData != null) { + metricData.outputMetrics(database, table, 1, len); + } + } + + private String parseValue(JsonNode data, String pattern) { + try { + return dynamicSchemaFormat.parse(data, pattern); + } catch (Exception e) { + LOGGER.warn("Parse value from pattern failed,the pattern: {}, data: {}", pattern, data); + } + return pattern; + } + + private void handleAlterOperation(String database, String table, byte[] originData, + String originSchema, JsonNode data, AlterOperation operation) { + if (operation.getAlterColumns() == null || operation.getAlterColumns().isEmpty()) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Alter columns is empty, origin schema: %s", originSchema)); + } + LOGGER.warn("Alter columns is empty, origin schema: {}", originSchema); + return; + } + Map<SchemaChangeType, List<AlterColumn>> typeMap = new LinkedHashMap<>(); + for (AlterColumn alterColumn : operation.getAlterColumns()) { + Set<SchemaChangeType> types = null; + try { + types = SchemaChangeUtils.extractSchemaChangeType(alterColumn); + Preconditions.checkState(!types.isEmpty(), "Schema change types is empty"); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Extract schema change type failed, origin schema: %s", originSchema), e); + } + LOGGER.warn("Extract schema change type failed, origin schema: {}", originSchema, e); + } + if (types == null) { + continue; + } + if (types.size() == 1) { + SchemaChangeType type = types.stream().findFirst().get(); + typeMap.computeIfAbsent(type, k -> new ArrayList<>()).add(alterColumn); + } else { + // Handle change column, it only exists change column type and rename column in this scenario for now. + for (SchemaChangeType type : types) { + SchemaChangePolicy policy = policyMap.get(type); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + } else { + doSchemaChangeBase(type, policy, originSchema); + } + } + } + } + if (!typeMap.isEmpty()) { + doAlterOperation(database, table, originData, originSchema, data, typeMap); + } + } + + private void doAlterOperation(String database, String table, byte[] originData, String originSchema, JsonNode data, + Map<SchemaChangeType, List<AlterColumn>> typeMap) { + StringJoiner joiner = new StringJoiner(","); + for (Entry<SchemaChangeType, List<AlterColumn>> kv : typeMap.entrySet()) { + SchemaChangePolicy policy = policyMap.get(kv.getKey()); + doSchemaChangeBase(kv.getKey(), policy, originSchema); + if (policy == SchemaChangePolicy.ENABLE) { + String alterStatement = null; + try { + switch (kv.getKey()) { + case ADD_COLUMN: + alterStatement = doAddColumn(kv.getValue()); + break; + case DROP_COLUMN: + alterStatement = doDropColumn(kv.getValue()); + break; + case RENAME_COLUMN: + alterStatement = doRenameColumn(kv.getKey(), originSchema); + break; + case CHANGE_COLUMN_TYPE: + alterStatement = doChangeColumnType(kv.getKey(), originSchema); + break; + default: + } + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Build alter statement failed, origin schema: %s", originSchema), e); + } + LOGGER.warn("Build alter statement failed, origin schema: {}", originSchema, e); + } + if (alterStatement != null) { + joiner.add(alterStatement); + } + } + } + String statement = joiner.toString(); + if (statement.length() != 0) { + try { + String alterStatementCommon = operationHelper.buildAlterStatementCommon(database, table); + statement = alterStatementCommon + statement; + // The checkLightSchemaChange is removed because most scenarios support it + boolean result = executeStatement(database, statement); + if (!result) { + LOGGER.error("Alter table failed,statement: {}", statement); + throw new SchemaChangeHandleException(String.format("Add column failed,statement: %s", statement)); + } + LOGGER.info("Alter table success,statement: {}", statement); + reportMetric(database, table, originData.length); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Alter table failed, origin schema: %s", originSchema), e); + } + handleDirtyData(data, originData, database, table, DirtyType.HANDLE_ALTER_TABLE_ERROR, e); + } + } + } + + private String doChangeColumnType(SchemaChangeType type, String originSchema) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return null; + } + + private String doRenameColumn(SchemaChangeType type, String originSchema) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return null; + } + + private String doDropColumn(List<AlterColumn> alterColumns) { + return operationHelper.buildDropColumnStatement(alterColumns); + } + + private String doAddColumn(List<AlterColumn> alterColumns) { + return operationHelper.buildAddColumnStatement(alterColumns); + } + + private void doTruncateTable(SchemaChangeType type, String originSchema) { + SchemaChangePolicy policy = policyMap.get(SchemaChangeType.TRUNCATE_TABLE); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return; + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doRenameTable(SchemaChangeType type, String originSchema) { + SchemaChangePolicy policy = policyMap.get(SchemaChangeType.RENAME_TABLE); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return; + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doDropTable(SchemaChangeType type, String originSchema) { + SchemaChangePolicy policy = policyMap.get(SchemaChangeType.DROP_TABLE); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return; + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doCreateTable(byte[] originData, String database, String table, SchemaChangeType type, + String originSchema, JsonNode data, CreateTableOperation operation) { + SchemaChangePolicy policy = policyMap.get(type); + if (policy == SchemaChangePolicy.ENABLE) { + try { + List<String> primaryKeys = dynamicSchemaFormat.extractPrimaryKeyNames(data); + String stmt = operationHelper.buildCreateTableStatement(database, table, primaryKeys, operation); + boolean result = executeStatement(database, stmt); + if (!result) { + LOGGER.error("Create table failed,statement: {}", stmt); + throw new IOException(String.format("Create table failed,statement: %s", stmt)); + } + reportMetric(database, table, originData.length); + return; + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Drop column failed, origin schema: %s", originSchema), e); + } + handleDirtyData(data, originData, database, table, DirtyType.CREATE_TABLE_ERROR, e); + return; + } + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doSchemaChangeBase(SchemaChangeType type, SchemaChangePolicy policy, String schema) { + if (policy == null) { + LOGGER.warn("Unsupported for {}: {}", type, schema); + return; + } + switch (policy) { + case LOG: + LOGGER.warn("Unsupported for {}: {}", type, schema); + break; + case ERROR: + throw new SchemaChangeHandleException(String.format("Unsupported for %s: %s", type, schema)); + default: + } + } + + private Map<String, Object> buildRequestParam(String column, boolean dropColumn) { + Map<String, Object> params = new HashMap<>(); + params.put("isDropColumn", dropColumn); + params.put("columnName", column); + return params; + } + + private String authHeader() { + return "Basic " + new String(Base64.encodeBase64((options.getUsername() + ":" + + options.getPassword()).getBytes(StandardCharsets.UTF_8))); + } + + private boolean executeStatement(String database, String stmt) throws IOException { + Map<String, String> param = new HashMap<>(); + param.put("stmt", stmt); + String requestUrl = String.format(SCHEMA_CHANGE_API, options.getFenodes(), database); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON); + httpPost.setEntity(new StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param))); + return sendRequest(httpPost); + } + + private boolean checkLightSchemaChange(String database, String table, String column, boolean dropColumn) + throws IOException { + String url = String.format(CHECK_LIGHT_SCHEMA_CHANGE_API, options.getFenodes(), database, table); + Map<String, Object> param = buildRequestParam(column, dropColumn); + HttpGetEntity httpGet = new HttpGetEntity(url); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param))); + boolean success = sendRequest(httpGet); + if (!success) { + LOGGER.warn("schema change can not do table {}.{}", database, table); + } + return success; + } + + @SuppressWarnings("unchecked") + private boolean sendRequest(HttpUriRequest request) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + for (int i = 0; i <= maxRetries; i++) { + try { Review Comment: `i` should be start from 1 or change judge `i < maxRetries` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
