This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 63b1290c [improve]Improve the clarity and detail of the database
synchronization logs (#467)
63b1290c is described below
commit 63b1290cec9d4f60f4e1490e8c00afde9de3bea7
Author: Petrichor <[email protected]>
AuthorDate: Wed Aug 14 10:18:50 2024 +0800
[improve]Improve the clarity and detail of the database synchronization
logs (#467)
---
.../org/apache/doris/flink/tools/cdc/DatabaseSync.java | 4 +++-
.../apache/doris/flink/tools/cdc/JdbcSourceSchema.java | 16 ++++++++++++++--
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 2f672adb..5ae44da6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -118,7 +118,9 @@ public abstract class DatabaseSync {
DorisSystem dorisSystem = new DorisSystem(options);
List<SourceSchema> schemaList = getSchemaList();
- Preconditions.checkState(!schemaList.isEmpty(), "No tables to be
synchronized.");
+ Preconditions.checkState(
+ !schemaList.isEmpty(),
+ "No tables to be synchronized. Please make sure whether the
tables that need to be synchronized exist in the corresponding database or
schema.");
if (!StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(database)) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index b421affb..31cfd1cb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -17,7 +17,11 @@
package org.apache.doris.flink.tools.cdc;
+import org.apache.flink.util.Preconditions;
+
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
@@ -31,6 +35,7 @@ import java.util.List;
* databases.
*/
public abstract class JdbcSourceSchema extends SourceSchema {
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcSourceSchema.class);
public JdbcSourceSchema(
DatabaseMetaData metaData,
@@ -48,7 +53,7 @@ public abstract class JdbcSourceSchema extends SourceSchema {
DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName)
throws SQLException {
LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
- //
+ LOG.debug("Starting to get column info for table: {}", tableName);
try (ResultSet rs = metaData.getColumns(databaseName, schemaName,
tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
@@ -63,10 +68,17 @@ public abstract class JdbcSourceSchema extends SourceSchema
{
if (rs.wasNull()) {
scale = null;
}
- String dorisTypeStr = convertToDorisType(fieldType, precision,
scale);
+ String dorisTypeStr = null;
+ try {
+ dorisTypeStr = convertToDorisType(fieldType, precision,
scale);
+ } catch (UnsupportedOperationException e) {
+ throw new UnsupportedOperationException(e + " in table: "
+ tableName);
+ }
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr,
comment));
}
}
+ Preconditions.checkArgument(!fields.isEmpty(), "The column info of {}
is empty", tableName);
+ LOG.debug("Successfully retrieved column info for table: {}",
tableName);
return fields;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]