This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 995da44 [fix]Fix the first schema change does not take effect when
the source table has no data (#354)
995da44 is described below
commit 995da442874c34332e70c870a7231e454320f493
Author: wudongliang <[email protected]>
AuthorDate: Thu Mar 28 11:33:13 2024 +0800
[fix]Fix the first schema change does not take effect when the source
table has no data (#354)
---
.../org/apache/doris/flink/rest/RestService.java | 159 +++++++++++++--------
.../flink/sink/schema/SchemaChangeManager.java | 32 +++--
.../JsonDebeziumSchemaChangeImplV2.java | 24 +++-
.../TestJsonDebeziumSchemaChangeImplV2.java | 87 +++++++++--
4 files changed, 210 insertions(+), 92 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 5ac139e..000141c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.rest;
import org.apache.flink.annotation.VisibleForTesting;
import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
@@ -30,21 +31,30 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.ConnectedFailedException;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.DorisSchemaChangeException;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.exception.ShouldNeverHappenException;
import org.apache.doris.flink.rest.models.Backend;
import org.apache.doris.flink.rest.models.BackendRow;
import org.apache.doris.flink.rest.models.BackendV2;
+import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
import org.apache.doris.flink.sink.BackendUtil;
+import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import java.io.IOException;
@@ -56,12 +66,12 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Scanner;
import java.util.Set;
import java.util.stream.Collectors;
@@ -78,13 +88,13 @@ public class RestService implements Serializable {
public static final int REST_RESPONSE_STATUS_OK = 200;
public static final int REST_RESPONSE_CODE_OK = 0;
private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
- private static final String API_PREFIX = "/api";
- private static final String SCHEMA = "_schema";
- private static final String QUERY_PLAN = "_query_plan";
private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
@Deprecated private static final String BACKENDS =
"/rest/v1/system?path=//backends";
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
private static final String FE_LOGIN = "/rest/v1/login";
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String TABLE_SCHEMA_API =
"http://%s/api/%s/%s/_schema";
+ private static final String QUERY_PLAN_API =
"http://%s/api/%s/%s/_query_plan";
/**
* send request to Doris FE and get response json string.
@@ -138,13 +148,9 @@ public class RestService implements Serializable {
try {
String response;
if (request instanceof HttpGet) {
- response =
- getConnectionGet(
- request, options.getUsername(),
options.getPassword(), logger);
+ response = getConnectionGet(request, options, logger);
} else {
- response =
- getConnectionPost(
- request, options.getUsername(),
options.getPassword(), logger);
+ response = getConnectionPost(request, options, logger);
}
if (response == null) {
logger.warn(
@@ -177,17 +183,12 @@ public class RestService implements Serializable {
}
private static String getConnectionPost(
- HttpRequestBase request, String user, String passwd, Logger
logger) throws IOException {
+ HttpRequestBase request, DorisOptions dorisOptions, Logger logger)
throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
- String authEncoding =
- Base64.getEncoder()
- .encodeToString(
- String.format("%s:%s", user, passwd)
- .getBytes(StandardCharsets.UTF_8));
- conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+ conn.setRequestProperty("Authorization", authHeader(dorisOptions));
InputStream content = ((HttpPost) request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
@@ -204,16 +205,11 @@ public class RestService implements Serializable {
}
private static String getConnectionGet(
- HttpRequestBase request, String user, String passwd, Logger
logger) throws IOException {
+ HttpRequestBase request, DorisOptions dorisOptions, Logger logger)
throws IOException {
URL realUrl = new URL(request.getURI().toString());
// open connection
HttpURLConnection connection = (HttpURLConnection)
realUrl.openConnection();
- String authEncoding =
- Base64.getEncoder()
- .encodeToString(
- String.format("%s:%s", user, passwd)
- .getBytes(StandardCharsets.UTF_8));
- connection.setRequestProperty("Authorization", "Basic " +
authEncoding);
+ connection.setRequestProperty("Authorization",
authHeader(dorisOptions));
connection.connect();
connection.setConnectTimeout(request.getConfig().getConnectTimeout());
@@ -323,14 +319,14 @@ public class RestService implements Serializable {
public static String randomBackend(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException, IOException {
- List<BackendV2.BackendRowV2> backends = getBackendsV2(options,
readOptions, logger);
+ List<BackendRowV2> backends = getBackendsV2(options, readOptions,
logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes",
String.valueOf(backends));
}
Collections.shuffle(backends);
- BackendV2.BackendRowV2 backend = backends.get(0);
+ BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
}
@@ -408,7 +404,7 @@ public class RestService implements Serializable {
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
- public static List<BackendV2.BackendRowV2> getBackendsV2(
+ public static List<BackendRowV2> getBackendsV2(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
{
String feNodes = options.getFenodes();
List<String> feNodeList = allEndpoints(feNodes, logger);
@@ -423,7 +419,7 @@ public class RestService implements Serializable {
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, readOptions, httpGet, logger);
logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends =
parseBackendV2(response, logger);
+ List<BackendRowV2> backends = parseBackendV2(response, logger);
return backends;
} catch (ConnectedFailedException e) {
logger.info(
@@ -444,16 +440,16 @@ public class RestService implements Serializable {
* @param feNodeList
* @return
*/
- private static List<BackendV2.BackendRowV2> convert(List<String>
feNodeList) {
- List<BackendV2.BackendRowV2> nodeList = new ArrayList<>();
+ private static List<BackendRowV2> convert(List<String> feNodeList) {
+ List<BackendRowV2> nodeList = new ArrayList<>();
for (String node : feNodeList) {
String[] split = node.split(":");
- nodeList.add(BackendV2.BackendRowV2.of(split[0],
Integer.valueOf(split[1]), true));
+ nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]),
true));
}
return nodeList;
}
- static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger) {
+ static List<BackendRowV2> parseBackendV2(String response, Logger logger) {
ObjectMapper mapper = new ObjectMapper();
BackendV2 backend;
try {
@@ -476,32 +472,11 @@ public class RestService implements Serializable {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
- List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+ List<BackendRowV2> backendRows = backend.getBackends();
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
- /**
- * get a valid URI to connect Doris FE.
- *
- * @param options configuration of request
- * @param logger {@link Logger}
- * @return uri string
- * @throws IllegalArgumentException throw when configuration is illegal
- */
- @VisibleForTesting
- static String getUriStr(DorisOptions options, Logger logger) throws
IllegalArgumentException {
- String[] identifier = parseIdentifier(options.getTableIdentifier(),
logger);
- return "http://"
- + randomEndpoint(options.getFenodes(), logger)
- + API_PREFIX
- + "/"
- + identifier[0]
- + "/"
- + identifier[1]
- + "/";
- }
-
/**
* discover Doris table schema from Doris FE.
*
@@ -514,12 +489,72 @@ public class RestService implements Serializable {
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
- HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
+ String[] tableIdentifier =
parseIdentifier(options.getTableIdentifier(), logger);
+ String tableSchemaUri =
+ String.format(
+ TABLE_SCHEMA_API,
+ randomEndpoint(options.getFenodes(), logger),
+ tableIdentifier[0],
+ tableIdentifier[1]);
+ HttpGet httpGet = new HttpGet(tableSchemaUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
}
+ public static Schema getSchema(
+ DorisOptions dorisOptions, String db, String table, Logger logger)
{
+ logger.trace("start get " + db + "." + table + " schema from doris.");
+ Object responseData = null;
+ try {
+ String tableSchemaUri =
+ String.format(
+ TABLE_SCHEMA_API,
+ randomEndpoint(dorisOptions.getFenodes(), logger),
+ db,
+ table);
+ HttpGetWithEntity httpGet = new HttpGetWithEntity(tableSchemaUri);
+ httpGet.setHeader(HttpHeaders.AUTHORIZATION,
authHeader(dorisOptions));
+ Map<String, Object> responseMap = handleResponse(httpGet, logger);
+ responseData = responseMap.get("data");
+ String schemaStr = objectMapper.writeValueAsString(responseData);
+ return objectMapper.readValue(schemaStr, Schema.class);
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ throw new DorisSchemaChangeException(
+ "can not parse response schema " + responseData, e);
+ }
+ }
+
+ private static Map handleResponse(HttpUriRequest request, Logger logger) {
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ CloseableHttpResponse response = httpclient.execute(request);
+ final int statusCode = response.getStatusLine().getStatusCode();
+ final String reasonPhrase =
response.getStatusLine().getReasonPhrase();
+ if (statusCode == 200 && response.getEntity() != null) {
+ String responseEntity =
EntityUtils.toString(response.getEntity());
+ return objectMapper.readValue(responseEntity, Map.class);
+ } else {
+ throw new DorisSchemaChangeException(
+ "Failed to schemaChange, status: "
+ + statusCode
+ + ", reason: "
+ + reasonPhrase);
+ }
+ } catch (Exception e) {
+ logger.trace("SchemaChange request error,", e);
+ throw new DorisSchemaChangeException(
+ "SchemaChange request error with " + e.getMessage());
+ }
+ }
+
+ private static String authHeader(DorisOptions dorisOptions) {
+ return "Basic "
+ + new String(
+ org.apache.commons.codec.binary.Base64.encodeBase64(
+ (dorisOptions.getUsername() + ":" +
dorisOptions.getPassword())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+
public static boolean isUniqueKeyType(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisRuntimeException {
@@ -606,8 +641,14 @@ public class RestService implements Serializable {
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
-
- HttpPost httpPost = new HttpPost(getUriStr(options, logger) +
QUERY_PLAN);
+ String[] tableIdentifier =
parseIdentifier(options.getTableIdentifier(), logger);
+ String queryPlanUri =
+ String.format(
+ QUERY_PLAN_API,
+ options.getFenodes(),
+ tableIdentifier[0],
+ tableIdentifier[1]);
+ HttpPost httpPost = new HttpPost(queryPlanUri);
String entity = "{\"sql\": \"" + sql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity,
StandardCharsets.UTF_8);
@@ -683,7 +724,7 @@ public class RestService implements Serializable {
static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan,
Logger logger)
throws DorisException {
Map<String, List<Long>> be2Tablets = new HashMap<>();
- for (Map.Entry<String, Tablet> part :
queryPlan.getPartitions().entrySet()) {
+ for (Entry<String, Tablet> part :
queryPlan.getPartitions().entrySet()) {
logger.debug("Parse tablet info: '{}'.", part);
long tabletId;
try {
@@ -777,7 +818,7 @@ public class RestService implements Serializable {
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
- for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
+ for (Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
logger.debug("Generate partition with beInfo: '{}'.", beInfo);
HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
beInfo.getValue().clear();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 979e353..2aca3c7 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -134,7 +134,19 @@ public class SchemaChangeManager implements Serializable {
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
- return handleResponse(httpGet);
+ String responseEntity = "";
+ Map<String, Object> responseMap = handleResponse(httpGet,
responseEntity);
+ return handleSchemaChange(responseMap, responseEntity);
+ }
+
+ private boolean handleSchemaChange(Map<String, Object> responseMap, String
responseEntity) {
+ String code = responseMap.getOrDefault("code", "-1").toString();
+ if (code.equals("0")) {
+ return true;
+ } else {
+ throw new DorisSchemaChangeException(
+ "Failed to schemaChange, response: " + responseEntity);
+ }
}
/** execute sql in doris. */
@@ -145,7 +157,9 @@ public class SchemaChangeManager implements Serializable {
}
LOG.info("Execute SQL: {}", ddl);
HttpPost httpPost = buildHttpPost(ddl, database);
- return handleResponse(httpPost);
+ String responseEntity = "";
+ Map<String, Object> responseMap = handleResponse(httpPost,
responseEntity);
+ return handleSchemaChange(responseMap, responseEntity);
}
public HttpPost buildHttpPost(String ddl, String database)
@@ -164,21 +178,15 @@ public class SchemaChangeManager implements Serializable {
return httpPost;
}
- private boolean handleResponse(HttpUriRequest request) {
+ private Map<String, Object> handleResponse(HttpUriRequest request, String
responseEntity) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase =
response.getStatusLine().getReasonPhrase();
if (statusCode == 200 && response.getEntity() != null) {
- String loadResult = EntityUtils.toString(response.getEntity());
- Map<String, Object> responseMap =
objectMapper.readValue(loadResult, Map.class);
- String code = responseMap.getOrDefault("code",
"-1").toString();
- if (code.equals("0")) {
- return true;
- } else {
- throw new DorisSchemaChangeException(
- "Failed to schemaChange, response: " + loadResult);
- }
+ responseEntity = EntityUtils.toString(response.getEntity());
+ Map<String, Object> responseMap =
objectMapper.readValue(responseEntity, Map.class);
+ return responseMap;
} else {
throw new DorisSchemaChangeException(
"Failed to schemaChange, status: "
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 7973212..9b41e2f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -31,6 +31,9 @@ import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
@@ -365,13 +368,22 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
}
}
} else {
- LOG.error(
- "Current schema change failed! You need to ensure that "
- + "there is data in the table."
- + dorisOptions.getTableIdentifier());
+ // In order to be compatible with column changes, the data is
empty or started from
+ // flink checkpoint, resulting in the originFieldSchemaMap not
being filled.
+ LOG.info(tableName + " fill origin field schema from doris
schema.");
fieldSchemaMap = new LinkedHashMap<>();
- Map<String, FieldSchema> finalFieldSchemaMap = fieldSchemaMap;
- columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap,
column));
+ String[] splitTableName = tableName.split("\\.");
+ Schema schema =
+ RestService.getSchema(dorisOptions, splitTableName[0],
splitTableName[1], LOG);
+ List<Field> columnFields = schema.getProperties();
+ for (Field column : columnFields) {
+ String columnName = column.getName();
+ String columnType = column.getType();
+ String columnComment = column.getComment();
+ // TODO need to fill column with default value
+ fieldSchemaMap.put(
+ columnName, new FieldSchema(columnName, columnType,
columnComment));
+ }
originFieldSchemaMap.put(tableName, fieldSchemaMap);
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 89d1ffc..2c0e917 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -23,10 +23,15 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
import java.io.IOException;
import java.util.Arrays;
@@ -37,11 +42,15 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+
/** Test for JsonDebeziumSchemaChangeImplV2. */
public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBase {
private JsonDebeziumSchemaChangeImplV2 schemaChange;
private JsonDebeziumChangeContext changeContext;
+ private MockedStatic<RestService> mockRestService;
@Before
public void setUp() {
@@ -49,6 +58,18 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
String sourceTableName = null;
String targetDatabase = "TESTDB";
Map<String, String> tableProperties = new HashMap<>();
+ String schemaStr =
+
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"}],\"status\":200}";
+ Schema schema = null;
+ try {
+ schema = objectMapper.readValue(schemaStr, Schema.class);
+ } catch (JsonProcessingException e) {
+ throw new DorisRuntimeException(e);
+ }
+ mockRestService = mockStatic(RestService.class);
+ mockRestService
+ .when(() -> RestService.getSchema(any(), any(), any(), any()))
+ .thenReturn(schema);
changeContext =
new JsonDebeziumChangeContext(
dorisOptions,
@@ -160,6 +181,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
String tableName = "db.test_fill";
+ schemaChange.setOriginFieldSchemaMap(buildOriginFiledSchema());
schemaChange.setSourceConnector("mysql");
String columnsString =
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
[...]
@@ -169,6 +191,32 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
schemaChange.getOriginFieldSchemaMap();
Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(tableName);
+ eqFiledSchema(fieldSchemaMap, srcFiledSchemaMap);
+ }
+
+ @Test
+ public void testFillOriginSchemaWithoutFiledSchema() throws IOException {
+ Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+ srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+ srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR", null,
""));
+
+ schemaChange.setSourceConnector("mysql");
+ String tableName = "test.test_sink3";
+ String columnsString =
+
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"0\",\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"
[...]
+ JsonNode columns = objectMapper.readTree(columnsString);
+ schemaChange.fillOriginSchema(tableName, columns);
+
+ Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+ schemaChange.getOriginFieldSchemaMap();
+ Assert.assertTrue(originFieldSchemaMap.containsKey(tableName));
+ Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(tableName);
+
+ eqFiledSchema(fieldSchemaMap, srcFiledSchemaMap);
+ }
+
+ private void eqFiledSchema(
+ Map<String, FieldSchema> fieldSchemaMap, Map<String, FieldSchema>
srcFiledSchemaMap) {
Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
fieldSchemaMap.entrySet().iterator();
for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
@@ -416,6 +464,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
@Test
public void testDateTimeFullOrigin() throws JsonProcessingException {
+ Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new
LinkedHashMap<>();
Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
srcFiledSchemaMap.put(
@@ -439,28 +488,36 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
new FieldSchema("test_ts_6", "DATETIMEV2(6)",
"current_timestamp", null));
String tableName = "db.test_fill";
+ originFieldSchemaMap.put(tableName, buildDatetimeFieldSchemaMap());
schemaChange.setSourceConnector("mysql");
+ schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
String columnsString =
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu
[...]
JsonNode columns = objectMapper.readTree(columnsString);
schemaChange.fillOriginSchema(tableName, columns);
- Map<String, Map<String, FieldSchema>> originFieldSchemaMap =
+ Map<String, Map<String, FieldSchema>> targetOriginFieldSchemaMap =
schemaChange.getOriginFieldSchemaMap();
- Map<String, FieldSchema> fieldSchemaMap =
originFieldSchemaMap.get(tableName);
+ Map<String, FieldSchema> fieldSchemaMap =
targetOriginFieldSchemaMap.get(tableName);
- Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
- fieldSchemaMap.entrySet().iterator();
- for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
- FieldSchema srcFiledSchema = entry.getValue();
- Entry<String, FieldSchema> originField =
originFieldSchemaIterator.next();
+ eqFiledSchema(fieldSchemaMap, srcFiledSchemaMap);
+ }
- Assert.assertEquals(entry.getKey(), originField.getKey());
- Assert.assertEquals(srcFiledSchema.getName(),
originField.getValue().getName());
- Assert.assertEquals(
- srcFiledSchema.getTypeString(),
originField.getValue().getTypeString());
- Assert.assertEquals(
- srcFiledSchema.getDefaultValue(),
originField.getValue().getDefaultValue());
- Assert.assertEquals(srcFiledSchema.getComment(),
originField.getValue().getComment());
- }
+ private Map<String, FieldSchema> buildDatetimeFieldSchemaMap() {
+ Map<String, FieldSchema> filedSchemaMap = new LinkedHashMap<>();
+ filedSchemaMap.put("id", new FieldSchema());
+ filedSchemaMap.put("test_dt_0", new FieldSchema());
+ filedSchemaMap.put("test_dt_1", new FieldSchema());
+ filedSchemaMap.put("test_dt_3", new FieldSchema());
+ filedSchemaMap.put("test_dt_6", new FieldSchema());
+ filedSchemaMap.put("test_ts_0", new FieldSchema());
+ filedSchemaMap.put("test_ts_1", new FieldSchema());
+ filedSchemaMap.put("test_ts_3", new FieldSchema());
+ filedSchemaMap.put("test_ts_6", new FieldSchema());
+ return filedSchemaMap;
+ }
+
+ @After
+ public void after() {
+ mockRestService.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]