This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 995da44  [fix]Fix the first schema change does not take effect when 
the source  table has no data (#354)
995da44 is described below

commit 995da442874c34332e70c870a7231e454320f493
Author: wudongliang <[email protected]>
AuthorDate: Thu Mar 28 11:33:13 2024 +0800

    [fix]Fix the first schema change does not take effect when the source  
table has no data (#354)
---
 .../org/apache/doris/flink/rest/RestService.java   | 159 +++++++++++++--------
 .../flink/sink/schema/SchemaChangeManager.java     |  32 +++--
 .../JsonDebeziumSchemaChangeImplV2.java            |  24 +++-
 .../TestJsonDebeziumSchemaChangeImplV2.java        |  87 +++++++++--
 4 files changed, 210 insertions(+), 92 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 5ac139e..000141c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.rest;
 import org.apache.flink.annotation.VisibleForTesting;
 
 import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
@@ -30,21 +31,30 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.ConnectedFailedException;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.DorisSchemaChangeException;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.exception.ShouldNeverHappenException;
 import org.apache.doris.flink.rest.models.Backend;
 import org.apache.doris.flink.rest.models.BackendRow;
 import org.apache.doris.flink.rest.models.BackendV2;
+import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
 import org.apache.doris.flink.rest.models.QueryPlan;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.rest.models.Tablet;
 import org.apache.doris.flink.sink.BackendUtil;
+import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -56,12 +66,12 @@ import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Scanner;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -78,13 +88,13 @@ public class RestService implements Serializable {
     public static final int REST_RESPONSE_STATUS_OK = 200;
     public static final int REST_RESPONSE_CODE_OK = 0;
     private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
-    private static final String API_PREFIX = "/api";
-    private static final String SCHEMA = "_schema";
-    private static final String QUERY_PLAN = "_query_plan";
     private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
     @Deprecated private static final String BACKENDS = 
"/rest/v1/system?path=//backends";
     private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
     private static final String FE_LOGIN = "/rest/v1/login";
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String TABLE_SCHEMA_API = 
"http://%s/api/%s/%s/_schema";;
+    private static final String QUERY_PLAN_API = 
"http://%s/api/%s/%s/_query_plan";;
 
     /**
      * send request to Doris FE and get response json string.
@@ -138,13 +148,9 @@ public class RestService implements Serializable {
             try {
                 String response;
                 if (request instanceof HttpGet) {
-                    response =
-                            getConnectionGet(
-                                    request, options.getUsername(), 
options.getPassword(), logger);
+                    response = getConnectionGet(request, options, logger);
                 } else {
-                    response =
-                            getConnectionPost(
-                                    request, options.getUsername(), 
options.getPassword(), logger);
+                    response = getConnectionPost(request, options, logger);
                 }
                 if (response == null) {
                     logger.warn(
@@ -177,17 +183,12 @@ public class RestService implements Serializable {
     }
 
     private static String getConnectionPost(
-            HttpRequestBase request, String user, String passwd, Logger 
logger) throws IOException {
+            HttpRequestBase request, DorisOptions dorisOptions, Logger logger) 
throws IOException {
         URL url = new URL(request.getURI().toString());
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod(request.getMethod());
-        String authEncoding =
-                Base64.getEncoder()
-                        .encodeToString(
-                                String.format("%s:%s", user, passwd)
-                                        .getBytes(StandardCharsets.UTF_8));
-        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+        conn.setRequestProperty("Authorization", authHeader(dorisOptions));
         InputStream content = ((HttpPost) request).getEntity().getContent();
         String res = IOUtils.toString(content);
         conn.setDoOutput(true);
@@ -204,16 +205,11 @@ public class RestService implements Serializable {
     }
 
     private static String getConnectionGet(
-            HttpRequestBase request, String user, String passwd, Logger 
logger) throws IOException {
+            HttpRequestBase request, DorisOptions dorisOptions, Logger logger) 
throws IOException {
         URL realUrl = new URL(request.getURI().toString());
         // open connection
         HttpURLConnection connection = (HttpURLConnection) 
realUrl.openConnection();
-        String authEncoding =
-                Base64.getEncoder()
-                        .encodeToString(
-                                String.format("%s:%s", user, passwd)
-                                        .getBytes(StandardCharsets.UTF_8));
-        connection.setRequestProperty("Authorization", "Basic " + 
authEncoding);
+        connection.setRequestProperty("Authorization", 
authHeader(dorisOptions));
 
         connection.connect();
         connection.setConnectTimeout(request.getConfig().getConnectTimeout());
@@ -323,14 +319,14 @@ public class RestService implements Serializable {
     public static String randomBackend(
             DorisOptions options, DorisReadOptions readOptions, Logger logger)
             throws DorisException, IOException {
-        List<BackendV2.BackendRowV2> backends = getBackendsV2(options, 
readOptions, logger);
+        List<BackendRowV2> backends = getBackendsV2(options, readOptions, 
logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
             throw new IllegalArgumentException("beNodes", 
String.valueOf(backends));
         }
         Collections.shuffle(backends);
-        BackendV2.BackendRowV2 backend = backends.get(0);
+        BackendRowV2 backend = backends.get(0);
         return backend.getIp() + ":" + backend.getHttpPort();
     }
 
@@ -408,7 +404,7 @@ public class RestService implements Serializable {
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
      */
-    public static List<BackendV2.BackendRowV2> getBackendsV2(
+    public static List<BackendRowV2> getBackendsV2(
             DorisOptions options, DorisReadOptions readOptions, Logger logger) 
{
         String feNodes = options.getFenodes();
         List<String> feNodeList = allEndpoints(feNodes, logger);
@@ -423,7 +419,7 @@ public class RestService implements Serializable {
                 HttpGet httpGet = new HttpGet(beUrl);
                 String response = send(options, readOptions, httpGet, logger);
                 logger.info("Backend Info:{}", response);
-                List<BackendV2.BackendRowV2> backends = 
parseBackendV2(response, logger);
+                List<BackendRowV2> backends = parseBackendV2(response, logger);
                 return backends;
             } catch (ConnectedFailedException e) {
                 logger.info(
@@ -444,16 +440,16 @@ public class RestService implements Serializable {
      * @param feNodeList
      * @return
      */
-    private static List<BackendV2.BackendRowV2> convert(List<String> 
feNodeList) {
-        List<BackendV2.BackendRowV2> nodeList = new ArrayList<>();
+    private static List<BackendRowV2> convert(List<String> feNodeList) {
+        List<BackendRowV2> nodeList = new ArrayList<>();
         for (String node : feNodeList) {
             String[] split = node.split(":");
-            nodeList.add(BackendV2.BackendRowV2.of(split[0], 
Integer.valueOf(split[1]), true));
+            nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]), 
true));
         }
         return nodeList;
     }
 
-    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger) {
+    static List<BackendRowV2> parseBackendV2(String response, Logger logger) {
         ObjectMapper mapper = new ObjectMapper();
         BackendV2 backend;
         try {
@@ -476,32 +472,11 @@ public class RestService implements Serializable {
             logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
             throw new ShouldNeverHappenException();
         }
-        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+        List<BackendRowV2> backendRows = backend.getBackends();
         logger.debug("Parsing schema result is '{}'.", backendRows);
         return backendRows;
     }
 
-    /**
-     * get a valid URI to connect Doris FE.
-     *
-     * @param options configuration of request
-     * @param logger {@link Logger}
-     * @return uri string
-     * @throws IllegalArgumentException throw when configuration is illegal
-     */
-    @VisibleForTesting
-    static String getUriStr(DorisOptions options, Logger logger) throws 
IllegalArgumentException {
-        String[] identifier = parseIdentifier(options.getTableIdentifier(), 
logger);
-        return "http://";
-                + randomEndpoint(options.getFenodes(), logger)
-                + API_PREFIX
-                + "/"
-                + identifier[0]
-                + "/"
-                + identifier[1]
-                + "/";
-    }
-
     /**
      * discover Doris table schema from Doris FE.
      *
@@ -514,12 +489,72 @@ public class RestService implements Serializable {
             DorisOptions options, DorisReadOptions readOptions, Logger logger)
             throws DorisException {
         logger.trace("Finding schema.");
-        HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
+        String[] tableIdentifier = 
parseIdentifier(options.getTableIdentifier(), logger);
+        String tableSchemaUri =
+                String.format(
+                        TABLE_SCHEMA_API,
+                        randomEndpoint(options.getFenodes(), logger),
+                        tableIdentifier[0],
+                        tableIdentifier[1]);
+        HttpGet httpGet = new HttpGet(tableSchemaUri);
         String response = send(options, readOptions, httpGet, logger);
         logger.debug("Find schema response is '{}'.", response);
         return parseSchema(response, logger);
     }
 
+    public static Schema getSchema(
+            DorisOptions dorisOptions, String db, String table, Logger logger) 
{
+        logger.trace("start get " + db + "." + table + " schema from doris.");
+        Object responseData = null;
+        try {
+            String tableSchemaUri =
+                    String.format(
+                            TABLE_SCHEMA_API,
+                            randomEndpoint(dorisOptions.getFenodes(), logger),
+                            db,
+                            table);
+            HttpGetWithEntity httpGet = new HttpGetWithEntity(tableSchemaUri);
+            httpGet.setHeader(HttpHeaders.AUTHORIZATION, 
authHeader(dorisOptions));
+            Map<String, Object> responseMap = handleResponse(httpGet, logger);
+            responseData = responseMap.get("data");
+            String schemaStr = objectMapper.writeValueAsString(responseData);
+            return objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException | IllegalArgumentException e) {
+            throw new DorisSchemaChangeException(
+                    "can not parse response schema " + responseData, e);
+        }
+    }
+
+    private static Map handleResponse(HttpUriRequest request, Logger logger) {
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            CloseableHttpResponse response = httpclient.execute(request);
+            final int statusCode = response.getStatusLine().getStatusCode();
+            final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
+            if (statusCode == 200 && response.getEntity() != null) {
+                String responseEntity = 
EntityUtils.toString(response.getEntity());
+                return objectMapper.readValue(responseEntity, Map.class);
+            } else {
+                throw new DorisSchemaChangeException(
+                        "Failed to schemaChange, status: "
+                                + statusCode
+                                + ", reason: "
+                                + reasonPhrase);
+            }
+        } catch (Exception e) {
+            logger.trace("SchemaChange request error,", e);
+            throw new DorisSchemaChangeException(
+                    "SchemaChange request error with " + e.getMessage());
+        }
+    }
+
+    private static String authHeader(DorisOptions dorisOptions) {
+        return "Basic "
+                + new String(
+                        org.apache.commons.codec.binary.Base64.encodeBase64(
+                                (dorisOptions.getUsername() + ":" + 
dorisOptions.getPassword())
+                                        .getBytes(StandardCharsets.UTF_8)));
+    }
+
     public static boolean isUniqueKeyType(
             DorisOptions options, DorisReadOptions readOptions, Logger logger)
             throws DorisRuntimeException {
@@ -606,8 +641,14 @@ public class RestService implements Serializable {
             sql += " where " + readOptions.getFilterQuery();
         }
         logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
-
-        HttpPost httpPost = new HttpPost(getUriStr(options, logger) + 
QUERY_PLAN);
+        String[] tableIdentifier = 
parseIdentifier(options.getTableIdentifier(), logger);
+        String queryPlanUri =
+                String.format(
+                        QUERY_PLAN_API,
+                        options.getFenodes(),
+                        tableIdentifier[0],
+                        tableIdentifier[1]);
+        HttpPost httpPost = new HttpPost(queryPlanUri);
         String entity = "{\"sql\": \"" + sql + "\"}";
         logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
         StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
@@ -683,7 +724,7 @@ public class RestService implements Serializable {
     static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, 
Logger logger)
             throws DorisException {
         Map<String, List<Long>> be2Tablets = new HashMap<>();
-        for (Map.Entry<String, Tablet> part : 
queryPlan.getPartitions().entrySet()) {
+        for (Entry<String, Tablet> part : 
queryPlan.getPartitions().entrySet()) {
             logger.debug("Parse tablet info: '{}'.", part);
             long tabletId;
             try {
@@ -777,7 +818,7 @@ public class RestService implements Serializable {
             throws IllegalArgumentException {
         int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
         List<PartitionDefinition> partitions = new ArrayList<>();
-        for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
+        for (Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
             logger.debug("Generate partition with beInfo: '{}'.", beInfo);
             HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
             beInfo.getValue().clear();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 979e353..2aca3c7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -134,7 +134,19 @@ public class SchemaChangeManager implements Serializable {
         HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
         httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
         httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
-        return handleResponse(httpGet);
+        String responseEntity = "";
+        Map<String, Object> responseMap = handleResponse(httpGet, 
responseEntity);
+        return handleSchemaChange(responseMap, responseEntity);
+    }
+
+    private boolean handleSchemaChange(Map<String, Object> responseMap, String 
responseEntity) {
+        String code = responseMap.getOrDefault("code", "-1").toString();
+        if (code.equals("0")) {
+            return true;
+        } else {
+            throw new DorisSchemaChangeException(
+                    "Failed to schemaChange, response: " + responseEntity);
+        }
     }
 
     /** execute sql in doris. */
@@ -145,7 +157,9 @@ public class SchemaChangeManager implements Serializable {
         }
         LOG.info("Execute SQL: {}", ddl);
         HttpPost httpPost = buildHttpPost(ddl, database);
-        return handleResponse(httpPost);
+        String responseEntity = "";
+        Map<String, Object> responseMap = handleResponse(httpPost, 
responseEntity);
+        return handleSchemaChange(responseMap, responseEntity);
     }
 
     public HttpPost buildHttpPost(String ddl, String database)
@@ -164,21 +178,15 @@ public class SchemaChangeManager implements Serializable {
         return httpPost;
     }
 
-    private boolean handleResponse(HttpUriRequest request) {
+    private Map<String, Object> handleResponse(HttpUriRequest request, String 
responseEntity) {
         try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
             CloseableHttpResponse response = httpclient.execute(request);
             final int statusCode = response.getStatusLine().getStatusCode();
             final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
             if (statusCode == 200 && response.getEntity() != null) {
-                String loadResult = EntityUtils.toString(response.getEntity());
-                Map<String, Object> responseMap = 
objectMapper.readValue(loadResult, Map.class);
-                String code = responseMap.getOrDefault("code", 
"-1").toString();
-                if (code.equals("0")) {
-                    return true;
-                } else {
-                    throw new DorisSchemaChangeException(
-                            "Failed to schemaChange, response: " + loadResult);
-                }
+                responseEntity = EntityUtils.toString(response.getEntity());
+                Map<String, Object> responseMap = 
objectMapper.readValue(responseEntity, Map.class);
+                return responseMap;
             } else {
                 throw new DorisSchemaChangeException(
                         "Failed to schemaChange, status: "
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 7973212..9b41e2f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -31,6 +31,9 @@ import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
 import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
@@ -365,13 +368,22 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                 }
             }
         } else {
-            LOG.error(
-                    "Current schema change failed! You need to ensure that "
-                            + "there is data in the table."
-                            + dorisOptions.getTableIdentifier());
+            // In order to be compatible with column changes, the data is 
empty or started from
+            // flink checkpoint, resulting in the originFieldSchemaMap not 
being filled.
+            LOG.info(tableName + " fill origin field schema from doris 
schema.");
             fieldSchemaMap = new LinkedHashMap<>();
-            Map<String, FieldSchema> finalFieldSchemaMap = fieldSchemaMap;
-            columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap, 
column));
+            String[] splitTableName = tableName.split("\\.");
+            Schema schema =
+                    RestService.getSchema(dorisOptions, splitTableName[0], 
splitTableName[1], LOG);
+            List<Field> columnFields = schema.getProperties();
+            for (Field column : columnFields) {
+                String columnName = column.getName();
+                String columnType = column.getType();
+                String columnComment = column.getComment();
+                // TODO need to fill column with default value
+                fieldSchemaMap.put(
+                        columnName, new FieldSchema(columnName, columnType, 
columnComment));
+            }
             originFieldSchemaMap.put(tableName, fieldSchemaMap);
         }
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 89d1ffc..2c0e917 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -23,10 +23,15 @@ import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -37,11 +42,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+
 /** Test for JsonDebeziumSchemaChangeImplV2. */
 public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBase {
 
     private JsonDebeziumSchemaChangeImplV2 schemaChange;
     private JsonDebeziumChangeContext changeContext;
+    private MockedStatic<RestService> mockRestService;
 
     @Before
     public void setUp() {
@@ -49,6 +58,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         String sourceTableName = null;
         String targetDatabase = "TESTDB";
         Map<String, String> tableProperties = new HashMap<>();
+        String schemaStr =
+                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"}],\"status\":200}";
+        Schema schema = null;
+        try {
+            schema = objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisRuntimeException(e);
+        }
+        mockRestService = mockStatic(RestService.class);
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(schema);
         changeContext =
                 new JsonDebeziumChangeContext(
                         dorisOptions,
@@ -160,6 +181,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
 
         String tableName = "db.test_fill";
+        schemaChange.setOriginFieldSchemaMap(buildOriginFiledSchema());
         schemaChange.setSourceConnector("mysql");
         String columnsString =
                 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
 [...]
@@ -169,6 +191,32 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                 schemaChange.getOriginFieldSchemaMap();
         Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(tableName);
 
+        eqFiledSchema(fieldSchemaMap, srcFiledSchemaMap);
+    }
+
+    @Test
+    public void testFillOriginSchemaWithoutFiledSchema() throws IOException {
+        Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+        srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+        srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR", null, 
""));
+
+        schemaChange.setSourceConnector("mysql");
+        String tableName = "test.test_sink3";
+        String columnsString =
+                
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"0\",\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
 [...]
+        JsonNode columns = objectMapper.readTree(columnsString);
+        schemaChange.fillOriginSchema(tableName, columns);
+
+        Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+                schemaChange.getOriginFieldSchemaMap();
+        Assert.assertTrue(originFieldSchemaMap.containsKey(tableName));
+        Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(tableName);
+
+        eqFiledSchema(fieldSchemaMap, srcFiledSchemaMap);
+    }
+
+    private void eqFiledSchema(
+            Map<String, FieldSchema> fieldSchemaMap, Map<String, FieldSchema> 
srcFiledSchemaMap) {
         Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
                 fieldSchemaMap.entrySet().iterator();
         for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
@@ -416,6 +464,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
 
     @Test
     public void testDateTimeFullOrigin() throws JsonProcessingException {
+        Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new 
LinkedHashMap<>();
         Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
         srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
         srcFiledSchemaMap.put(
@@ -439,28 +488,36 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                 new FieldSchema("test_ts_6", "DATETIMEV2(6)", 
"current_timestamp", null));
 
         String tableName = "db.test_fill";
+        originFieldSchemaMap.put(tableName, buildDatetimeFieldSchemaMap());
         schemaChange.setSourceConnector("mysql");
+        schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
         String columnsString =
                 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu
 [...]
         JsonNode columns = objectMapper.readTree(columnsString);
         schemaChange.fillOriginSchema(tableName, columns);
-        Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+        Map<String, Map<String, FieldSchema>> targetOriginFieldSchemaMap =
                 schemaChange.getOriginFieldSchemaMap();
-        Map<String, FieldSchema> fieldSchemaMap = 
originFieldSchemaMap.get(tableName);
+        Map<String, FieldSchema> fieldSchemaMap = 
targetOriginFieldSchemaMap.get(tableName);
 
-        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
-                fieldSchemaMap.entrySet().iterator();
-        for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
-            FieldSchema srcFiledSchema = entry.getValue();
-            Entry<String, FieldSchema> originField = 
originFieldSchemaIterator.next();
+        eqFiledSchema(fieldSchemaMap, srcFiledSchemaMap);
+    }
 
-            Assert.assertEquals(entry.getKey(), originField.getKey());
-            Assert.assertEquals(srcFiledSchema.getName(), 
originField.getValue().getName());
-            Assert.assertEquals(
-                    srcFiledSchema.getTypeString(), 
originField.getValue().getTypeString());
-            Assert.assertEquals(
-                    srcFiledSchema.getDefaultValue(), 
originField.getValue().getDefaultValue());
-            Assert.assertEquals(srcFiledSchema.getComment(), 
originField.getValue().getComment());
-        }
+    private Map<String, FieldSchema> buildDatetimeFieldSchemaMap() {
+        Map<String, FieldSchema> filedSchemaMap = new LinkedHashMap<>();
+        filedSchemaMap.put("id", new FieldSchema());
+        filedSchemaMap.put("test_dt_0", new FieldSchema());
+        filedSchemaMap.put("test_dt_1", new FieldSchema());
+        filedSchemaMap.put("test_dt_3", new FieldSchema());
+        filedSchemaMap.put("test_dt_6", new FieldSchema());
+        filedSchemaMap.put("test_ts_0", new FieldSchema());
+        filedSchemaMap.put("test_ts_1", new FieldSchema());
+        filedSchemaMap.put("test_ts_3", new FieldSchema());
+        filedSchemaMap.put("test_ts_6", new FieldSchema());
+        return filedSchemaMap;
+    }
+
+    @After
+    public void after() {
+        mockRestService.close();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to