This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 63b1290c [improve]Improve the clarity and detail of the database 
synchronization logs (#467)
63b1290c is described below

commit 63b1290cec9d4f60f4e1490e8c00afde9de3bea7
Author: Petrichor <[email protected]>
AuthorDate: Wed Aug 14 10:18:50 2024 +0800

    [improve]Improve the clarity and detail of the database synchronization 
logs (#467)
---
 .../org/apache/doris/flink/tools/cdc/DatabaseSync.java   |  4 +++-
 .../apache/doris/flink/tools/cdc/JdbcSourceSchema.java   | 16 ++++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 2f672adb..5ae44da6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -118,7 +118,9 @@ public abstract class DatabaseSync {
         DorisSystem dorisSystem = new DorisSystem(options);
 
         List<SourceSchema> schemaList = getSchemaList();
-        Preconditions.checkState(!schemaList.isEmpty(), "No tables to be 
synchronized.");
+        Preconditions.checkState(
+                !schemaList.isEmpty(),
+                "No tables to be synchronized. Please make sure whether the 
tables that need to be synchronized exist in the corresponding database or 
schema.");
 
         if (!StringUtils.isNullOrWhitespaceOnly(database)
                 && !dorisSystem.databaseExists(database)) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index b421affb..31cfd1cb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -17,7 +17,11 @@
 
 package org.apache.doris.flink.tools.cdc;
 
+import org.apache.flink.util.Preconditions;
+
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -31,6 +35,7 @@ import java.util.List;
  * databases.
  */
 public abstract class JdbcSourceSchema extends SourceSchema {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSchema.class);
 
     public JdbcSourceSchema(
             DatabaseMetaData metaData,
@@ -48,7 +53,7 @@ public abstract class JdbcSourceSchema extends SourceSchema {
             DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName)
             throws SQLException {
         LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
-        //
+        LOG.debug("Starting to get column info for table: {}", tableName);
         try (ResultSet rs = metaData.getColumns(databaseName, schemaName, 
tableName, null)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
@@ -63,10 +68,17 @@ public abstract class JdbcSourceSchema extends SourceSchema 
{
                 if (rs.wasNull()) {
                     scale = null;
                 }
-                String dorisTypeStr = convertToDorisType(fieldType, precision, 
scale);
+                String dorisTypeStr = null;
+                try {
+                    dorisTypeStr = convertToDorisType(fieldType, precision, 
scale);
+                } catch (UnsupportedOperationException e) {
+                    throw new UnsupportedOperationException(e + " in table: " 
+ tableName);
+                }
                 fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, 
comment));
             }
         }
+        Preconditions.checkArgument(!fields.isEmpty(), "The column info of {} 
is empty", tableName);
+        LOG.debug("Successfully retrieved column info for table: {}", 
tableName);
         return fields;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to