This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 696f2948fa [Feature][Connector-V2] Add `decimal_type_narrowing` option 
in jdbc (#7461)
696f2948fa is described below

commit 696f2948fa618e84b0c715f44ea7cfdab1d74b5b
Author: Jia Fan <[email protected]>
AuthorDate: Wed Aug 28 14:28:26 2024 +0800

    [Feature][Connector-V2] Add `decimal_type_narrowing` option in jdbc (#7461)
---
 docs/en/connector-v2/source/Jdbc.md                |  25 ++-
 docs/en/connector-v2/source/PostgreSQL.md          |  15 --
 .../seatunnel/jdbc/catalog/JdbcCatalogOptions.java |   6 +-
 .../jdbc/catalog/oracle/OracleCatalog.java         |  25 ++-
 .../jdbc/catalog/oracle/OracleCatalogFactory.java  |   4 +-
 .../jdbc/config/JdbcConnectionConfig.java          |  15 ++
 .../seatunnel/jdbc/config/JdbcOptions.java         |   7 +
 .../seatunnel/jdbc/config/JdbcSourceConfig.java    |   5 +-
 .../dialect/oracle/OracleTypeConverter.java        |  24 ++-
 .../internal/dialect/oracle/OracleTypeMapper.java  |  13 +-
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |   3 +
 .../dialect/oracle/OracleTypeConverterTest.java    | 239 ++++++++++++++++-----
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    |  24 +++
 .../test/resources/jdbc_oracle_source_to_sink.conf |   4 +-
 .../jdbc_oracle_source_to_sink_use_select1.conf    |   4 +-
 .../jdbc_oracle_source_to_sink_use_select2.conf    |   2 +-
 .../jdbc_oracle_source_to_sink_use_select3.conf    |   2 +-
 ...rce_to_sink_without_decimal_type_narrowing.conf |  82 +++++++
 18 files changed, 407 insertions(+), 92 deletions(-)

diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 31257d85b1..44a8a7f3df 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -39,7 +39,7 @@ supports query SQL and can achieve projection effect.
 
 ## Options
 
-|                    name                    |  type   | required |  default 
value  |                                                                        
                                                                                
                                                                                
                                                                            
description                                                                     
                    [...]
+| name                                       | type    | required | default 
value   | description                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
 
|--------------------------------------------|---------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | url                                        | String  | Yes      | -          
     | The URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost/test                                                
                                                                                
                                                                                
                                                                                
                                      [...]
 | driver                                     | String  | Yes      | -          
     | The jdbc class name used to connect to the remote data source, if you 
use MySQL the value is `com.mysql.cj.jdbc.Driver`.                              
                                                                                
                                                                                
                                                                                
                 [...]
@@ -52,6 +52,7 @@ supports query SQL and can achieve projection effect.
 | partition_upper_bound                      | Long    | No       | -          
     | The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.                                                         
                                                                                
                                                                                
                                                                                
              [...]
 | partition_lower_bound                      | Long    | No       | -          
     | The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.                                                         
                                                                                
                                                                                
                                                                                
              [...]
 | partition_num                              | Int     | No       | job 
parallelism | Not recommended for use, The correct approach is to control the 
number of split through `split.size`<br/> How many splits do we need to split 
into, only support positive integer. default value is job parallelism.          
                                                                                
                                                                                
                         [...]
+| decimal_type_narrowing                     | Boolean | No       | true       
     | Decimal type narrowing, if true, the decimal type will be narrowed to 
the int or long type if without loss of precision. Only support for Oracle at 
now. Please refer to `decimal_type_narrowing` below                             
                                                                                
                                                                                
                   [...]
 | use_select_count                           | Boolean | No       | false      
     | Use select count for table count rather then other methods in dynamic 
chunk split stage. This is currently only available for jdbc-oracle.In this 
scenario, select count directly is used when it is faster to update statistics 
using sql from analysis table                                                   
                                                                                
                      [...]
 | skip_analyze                               | Boolean | No       | false      
     | Skip the analysis of table count in dynamic chunk split stage. This is 
currently only available for jdbc-oracle.In this scenario, you schedule 
analysis table sql to update related table statistics periodically or your 
table data does not change frequently                                           
                                                                                
                             [...]
 | fetch_size                                 | Int     | No       | 0          
     | For queries that return a large number of objects, you can configure the 
row fetch size used in the query to improve performance by reducing the number 
database hits required to satisfy the selection criteria. Zero means use jdbc 
default value.                                                                  
                                                                                
                 [...]
@@ -66,6 +67,28 @@ supports query SQL and can achieve projection effect.
 | split.inverse-sampling.rate                | Int     | No       | 1000       
     | The inverse of the sampling rate used in the sample sharding strategy. 
For example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is pref [...]
 | common-options                             |         | No       | -          
     | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                                                                                
                       [...]
 
+### decimal_type_narrowing
+
+Decimal type narrowing, if true, the decimal type will be narrowed to the int 
or long type if without loss of precision. Only support for Oracle at now.
+
+eg:
+
+decimal_type_narrowing = true
+
+| Oracle        | SeaTunnel |
+|---------------|-----------|
+| NUMBER(1, 0)  | Boolean   |
+| NUMBER(6, 0)  | INT       |
+| NUMBER(10, 0) | BIGINT    |
+
+decimal_type_narrowing = false
+
+| Oracle        | SeaTunnel      |
+|---------------|----------------|
+| NUMBER(1, 0)  | Decimal(1, 0)  |
+| NUMBER(6, 0)  | Decimal(6, 0)  |
+| NUMBER(10, 0) | Decimal(10, 0) |
+
 ## Parallel Reader
 
 The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
diff --git a/docs/en/connector-v2/source/PostgreSQL.md 
b/docs/en/connector-v2/source/PostgreSQL.md
index b6e95c8ad7..5dd922b76f 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -74,21 +74,6 @@ Read external data source data through JDBC.
 
 ## Options
 
-|             Name             |    Type    | Required |     Default     |     
                                                                                
                                       Description                              
                                                                                
              |
-|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url                          | String     | Yes      | -               | The 
URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost:5432/test                                           
                                                                                
                                                 |
-| driver                       | String     | Yes      | -               | The 
jdbc class name used to connect to the remote data source,<br/> if you use 
PostgreSQL the value is `org.postgresql.Driver`.                                
                                                                                
                   |
-| user                         | String     | No       | -               | 
Connection instance user name                                                   
                                                                                
                                                                                
                  |
-| password                     | String     | No       | -               | 
Connection instance password                                                    
                                                                                
                                                                                
                  |
-| query                        | String     | Yes      | -               | 
Query statement                                                                 
                                                                                
                                                                                
                  |
-| connection_check_timeout_sec | Int        | No       | 30              | The 
time in seconds to wait for the database operation used to validate the 
connection to complete                                                          
                                                                                
                      |
-| partition_column             | String     | No       | -               | The 
column name for parallelism's partition, only support numeric type,Only support 
numeric type primary key, and only can config one column.                       
                                                                                
              |
-| partition_lower_bound        | BigDecimal | No       | -               | The 
partition_column min value for scan, if not set SeaTunnel will query database 
get min value.                                                                  
                                                                                
                |
-| partition_upper_bound        | BigDecimal | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                |
-| partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. default value is job 
parallelism                                                                     
                                                                                
               |
-| fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects,you can configure<br/> the row 
fetch size used in the query toimprove performance by<br/> reducing the number 
database hits required to satisfy the selection criteria.<br/> Zero means use 
jdbc default value. |
-| properties                   | Map        | No       | -               | 
Additional connection configuration parameters,when properties and URL have the 
same parameters, the priority is determined by the <br/>specific implementation 
of the driver. For example, in MySQL, properties take precedence over the URL.  
                  |
-
 |                    Name                    |    Type    | Required |     
Default     |                                                                   
                                                                                
                                                                                
                                                                  Description   
                                                                                
                  [...]
 
|--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | url                                        | String     | Yes      | -       
        | The URL of the JDBC connection. Refer to a case: 
jdbc:mysql://localhost:3306:3306/test                                           
                                                                                
                                                                                
                                                                                
                                   [...]
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index c2f2405ee0..c412ca9218 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 
 public interface JdbcCatalogOptions {
     Option<String> BASE_URL =
@@ -59,7 +60,10 @@ public interface JdbcCatalogOptions {
                                     + "For example, when using OceanBase 
database, you need to set it to 'mysql' or 'oracle'.");
 
     OptionRule.Builder BASE_RULE =
-            OptionRule.builder().required(BASE_URL).required(USERNAME, 
PASSWORD).optional(SCHEMA);
+            OptionRule.builder()
+                    .required(BASE_URL)
+                    .required(USERNAME, PASSWORD)
+                    .optional(SCHEMA, JdbcOptions.DECIMAL_TYPE_NARROWING);
 
     Option<String> TABLE_PREFIX =
             Options.key("tablePrefix")
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 5aa6dcd874..ccbbfb509d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;
 
@@ -71,13 +72,32 @@ public class OracleCatalog extends AbstractJdbcCatalog {
                     + "ORDER BY \n"
                     + "    cols.column_id \n";
 
+    private boolean decimalTypeNarrowing;
+
     public OracleCatalog(
             String catalogName,
             String username,
             String pwd,
             JdbcUrlUtil.UrlInfo urlInfo,
             String defaultSchema) {
+        this(
+                catalogName,
+                username,
+                pwd,
+                urlInfo,
+                defaultSchema,
+                JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue());
+    }
+
+    public OracleCatalog(
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String defaultSchema,
+            boolean decimalTypeNarrowing) {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
+        this.decimalTypeNarrowing = decimalTypeNarrowing;
     }
 
     @Override
@@ -162,7 +182,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
                         .defaultValue(defaultValue)
                         .comment(columnComment)
                         .build();
-        return OracleTypeConverter.INSTANCE.convert(typeDefine);
+        return new 
OracleTypeConverter(decimalTypeNarrowing).convert(typeDefine);
     }
 
     @Override
@@ -183,7 +203,8 @@ public class OracleCatalog extends AbstractJdbcCatalog {
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
         Connection defaultConnection = getConnection(defaultUrl);
-        return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
OracleTypeMapper());
+        return CatalogUtils.getCatalogTable(
+                defaultConnection, sqlQuery, new 
OracleTypeMapper(decimalTypeNarrowing));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
index 7c90c79347..2b51d97621 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 
 import com.google.auto.service.AutoService;
@@ -52,7 +53,8 @@ public class OracleCatalogFactory implements CatalogFactory {
                 options.get(JdbcCatalogOptions.USERNAME),
                 options.get(JdbcCatalogOptions.PASSWORD),
                 urlInfo,
-                options.get(JdbcCatalogOptions.SCHEMA));
+                options.get(JdbcCatalogOptions.SCHEMA),
+                options.get(JdbcOptions.DECIMAL_TYPE_NARROWING));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index dc379bb38a..053ab71a41 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -43,6 +43,8 @@ public class JdbcConnectionConfig implements Serializable {
 
     public String xaDataSourceClassName;
 
+    public boolean decimalTypeNarrowing = 
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
+
     public int maxCommitAttempts = 
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
 
     public int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
@@ -81,6 +83,8 @@ public class JdbcConnectionConfig implements Serializable {
         config.getOptional(JdbcOptions.USER).ifPresent(builder::username);
         config.getOptional(JdbcOptions.PASSWORD).ifPresent(builder::password);
         
config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties);
+        config.getOptional(JdbcOptions.DECIMAL_TYPE_NARROWING)
+                .ifPresent(builder::decimalTypeNarrowing);
         return builder.build();
     }
 
@@ -108,6 +112,10 @@ public class JdbcConnectionConfig implements Serializable {
         return maxRetries;
     }
 
+    public boolean isDecimalTypeNarrowing() {
+        return decimalTypeNarrowing;
+    }
+
     public Optional<String> getUsername() {
         return Optional.ofNullable(username);
     }
@@ -153,6 +161,7 @@ public class JdbcConnectionConfig implements Serializable {
         private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
         private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
         private String xaDataSourceClassName;
+        private boolean decimalTypeNarrowing = 
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
         private int maxCommitAttempts = 
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
         private int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
         private Map<String, String> properties;
@@ -183,6 +192,11 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
+        public Builder decimalTypeNarrowing(boolean decimalTypeNarrowing) {
+            this.decimalTypeNarrowing = decimalTypeNarrowing;
+            return this;
+        }
+
         public Builder maxRetries(int maxRetries) {
             this.maxRetries = maxRetries;
             return this;
@@ -267,6 +281,7 @@ public class JdbcConnectionConfig implements Serializable {
             jdbcConnectionConfig.transactionTimeoutSec = 
this.transactionTimeoutSec;
             jdbcConnectionConfig.maxCommitAttempts = this.maxCommitAttempts;
             jdbcConnectionConfig.xaDataSourceClassName = 
this.xaDataSourceClassName;
+            jdbcConnectionConfig.decimalTypeNarrowing = 
this.decimalTypeNarrowing;
             jdbcConnectionConfig.useKerberos = this.useKerberos;
             jdbcConnectionConfig.kerberosPrincipal = this.kerberosPrincipal;
             jdbcConnectionConfig.kerberosKeytabPath = this.kerberosKeytabPath;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 7f0ec48f36..976650456b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -101,6 +101,13 @@ public interface JdbcOptions {
                     .defaultValue(false)
                     .withDescription("generate sql using the database table");
 
+    Option<Boolean> DECIMAL_TYPE_NARROWING =
+            Options.key("decimal_type_narrowing")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "decimal type narrowing, if true, the decimal type 
will be narrowed to the int or long type if without loss of precision. Only 
support for Oracle at now.");
+
     Option<String> XA_DATA_SOURCE_CLASS_NAME =
             Options.key("xa_data_source_class_name")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 74c7801318..09cc92d70e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -42,6 +42,7 @@ public class JdbcSourceConfig implements Serializable {
     private double splitEvenDistributionFactorLowerBound;
     private int splitSampleShardingThreshold;
     private int splitInverseSamplingRate;
+    private boolean decimalTypeNarrowing;
 
     public static JdbcSourceConfig of(ReadonlyConfig config) {
         JdbcSourceConfig.Builder builder = JdbcSourceConfig.builder();
@@ -53,7 +54,7 @@ public class JdbcSourceConfig implements Serializable {
         boolean isOldVersion =
                 config.getOptional(JdbcOptions.QUERY).isPresent()
                         && 
config.getOptional(JdbcOptions.PARTITION_COLUMN).isPresent();
-        builder.useDynamicSplitter(isOldVersion ? false : true);
+        builder.useDynamicSplitter(!isOldVersion);
 
         builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE));
         builder.splitEvenDistributionFactorUpperBound(
@@ -64,6 +65,8 @@ public class JdbcSourceConfig implements Serializable {
                 config.get(JdbcSourceOptions.SPLIT_SAMPLE_SHARDING_THRESHOLD));
         
builder.splitInverseSamplingRate(config.get(JdbcSourceOptions.SPLIT_INVERSE_SAMPLING_RATE));
 
+        
builder.decimalTypeNarrowing(config.get(JdbcOptions.DECIMAL_TYPE_NARROWING));
+
         config.getOptional(JdbcSourceOptions.WHERE_CONDITION)
                 .ifPresent(
                         whereConditionClause -> {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
index d359f3fef0..023fa949cf 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
@@ -86,6 +86,16 @@ public class OracleTypeConverter implements 
TypeConverter<BasicTypeDefine> {
     public static final long BYTES_4GB = (long) Math.pow(2, 32);
     public static final OracleTypeConverter INSTANCE = new 
OracleTypeConverter();
 
+    private final boolean decimalTypeNarrowing;
+
+    public OracleTypeConverter() {
+        this(true);
+    }
+
+    public OracleTypeConverter(boolean decimalTypeNarrowing) {
+        this.decimalTypeNarrowing = decimalTypeNarrowing;
+    }
+
     @Override
     public String identifier() {
         return DatabaseIdentifier.ORACLE;
@@ -119,12 +129,14 @@ public class OracleTypeConverter implements 
TypeConverter<BasicTypeDefine> {
 
                 if (scale <= 0) {
                     int newPrecision = (int) (precision - scale);
-                    if (newPrecision == 1) {
-                        builder.dataType(BasicType.BOOLEAN_TYPE);
-                    } else if (newPrecision <= 9) {
-                        builder.dataType(BasicType.INT_TYPE);
-                    } else if (newPrecision <= 18) {
-                        builder.dataType(BasicType.LONG_TYPE);
+                    if (newPrecision <= 18 && decimalTypeNarrowing) {
+                        if (newPrecision == 1) {
+                            builder.dataType(BasicType.BOOLEAN_TYPE);
+                        } else if (newPrecision <= 9) {
+                            builder.dataType(BasicType.INT_TYPE);
+                        } else {
+                            builder.dataType(BasicType.LONG_TYPE);
+                        }
                     } else if (newPrecision < 38) {
                         builder.dataType(new DecimalType(newPrecision, 0));
                         builder.columnLength((long) newPrecision);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
index ce5ef8af88..bbdd19af8e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
 import lombok.extern.slf4j.Slf4j;
@@ -31,9 +32,19 @@ import java.util.Arrays;
 @Slf4j
 public class OracleTypeMapper implements JdbcDialectTypeMapper {
 
+    private final boolean decimalTypeNarrowing;
+
+    public OracleTypeMapper() {
+        this(JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue());
+    }
+
+    public OracleTypeMapper(boolean decimalTypeNarrowing) {
+        this.decimalTypeNarrowing = decimalTypeNarrowing;
+    }
+
     @Override
     public Column mappingColumn(BasicTypeDefine typeDefine) {
-        return OracleTypeConverter.INSTANCE.convert(typeDefine);
+        return new 
OracleTypeConverter(decimalTypeNarrowing).convert(typeDefine);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 860131041a..1fa379acb4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -395,6 +396,8 @@ public class JdbcCatalogUtils {
                 .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
         Optional.ofNullable(config.getCompatibleMode())
                 .ifPresent(val -> 
catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val));
+        catalogConfig.put(
+                JdbcOptions.DECIMAL_TYPE_NARROWING.key(), 
config.isDecimalTypeNarrowing());
         return ReadonlyConfig.fromMap(catalogConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
index 26238bad30..d4a8defdda 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
@@ -36,12 +36,14 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.or
 
 public class OracleTypeConverterTest {
 
+    private static final OracleTypeConverter INSTANCE = new 
OracleTypeConverter();
+
     @Test
     public void testConvertUnsupported() {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("aaa").dataType("aaa").build();
         try {
-            OracleTypeConverter.INSTANCE.convert(typeDefine);
+            INSTANCE.convert(typeDefine);
             Assertions.fail();
         } catch (SeaTunnelRuntimeException e) {
             // ignore
@@ -50,6 +52,113 @@ public class OracleTypeConverterTest {
         }
     }
 
+    @Test
+    public void testConvertNumberWithoutDecimalTypeNarrowing() {
+        OracleTypeConverter converter = new OracleTypeConverter(false);
+
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number")
+                        .dataType("number")
+                        .build();
+        Column column = converter.convert(typeDefine);
+
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number(38,127)")
+                        .dataType("number")
+                        .precision(38L)
+                        .scale(127)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number")
+                        .dataType("number")
+                        .scale(0)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number(1,0)")
+                        .dataType("number")
+                        .precision(1L)
+                        .scale(0)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(1, 0), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number(8,0)")
+                        .dataType("number")
+                        .precision(8L)
+                        .scale(0)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(8, 0), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number(18,0)")
+                        .dataType("number")
+                        .precision(18L)
+                        .scale(0)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(18, 0), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number(38,0)")
+                        .dataType("number")
+                        .precision(38L)
+                        .scale(0)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number(39,0)")
+                        .dataType("number")
+                        .precision(39L)
+                        .scale(0)
+                        .build();
+        column = converter.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
+
     @Test
     public void testConvertInteger() {
         BasicTypeDefine<Object> typeDefine =
@@ -58,10 +167,24 @@ public class OracleTypeConverterTest {
                         .columnType("integer")
                         .dataType("integer")
                         .build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        // generated by int/smallint type in oracle create table sql
+        BasicTypeDefine<Object> numberTypeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("number")
+                        .dataType("number")
+                        .precision(null)
+                        .scale(0)
+                        .build();
+        column = INSTANCE.convert(numberTypeDefine);
+        Assertions.assertEquals(numberTypeDefine.getName(), column.getName());
+        Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+        Assertions.assertEquals(numberTypeDefine.getColumnType(), 
column.getSourceType());
     }
 
     @Test
@@ -72,7 +195,7 @@ public class OracleTypeConverterTest {
                         .columnType("number")
                         .dataType("number")
                         .build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
@@ -86,7 +209,7 @@ public class OracleTypeConverterTest {
                         .precision(38L)
                         .scale(127)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -98,7 +221,7 @@ public class OracleTypeConverterTest {
                         .dataType("number")
                         .scale(0)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -111,7 +234,7 @@ public class OracleTypeConverterTest {
                         .precision(1L)
                         .scale(0)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -124,7 +247,7 @@ public class OracleTypeConverterTest {
                         .precision(8L)
                         .scale(0)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -137,7 +260,7 @@ public class OracleTypeConverterTest {
                         .precision(18L)
                         .scale(0)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -150,7 +273,7 @@ public class OracleTypeConverterTest {
                         .precision(38L)
                         .scale(0)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -163,7 +286,7 @@ public class OracleTypeConverterTest {
                         .precision(39L)
                         .scale(0)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -177,7 +300,7 @@ public class OracleTypeConverterTest {
                         .columnType("float")
                         .dataType("float")
                         .build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
@@ -189,7 +312,7 @@ public class OracleTypeConverterTest {
                         .columnType("binary_float")
                         .dataType("binary_float")
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
@@ -197,7 +320,7 @@ public class OracleTypeConverterTest {
 
         typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("real").dataType("real").build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
@@ -212,7 +335,7 @@ public class OracleTypeConverterTest {
                         .columnType("binary_double")
                         .dataType("binary_double")
                         .build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
@@ -228,7 +351,7 @@ public class OracleTypeConverterTest {
                         .dataType("char")
                         .length(1L)
                         .build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -242,7 +365,7 @@ public class OracleTypeConverterTest {
                         .dataType("nchar")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -256,7 +379,7 @@ public class OracleTypeConverterTest {
                         .dataType("varchar")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -270,7 +393,7 @@ public class OracleTypeConverterTest {
                         .dataType("varchar2")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -284,7 +407,7 @@ public class OracleTypeConverterTest {
                         .dataType("nvarchar2")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -297,7 +420,7 @@ public class OracleTypeConverterTest {
                         .columnType("rowid")
                         .dataType("rowid")
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -311,7 +434,7 @@ public class OracleTypeConverterTest {
                         .dataType("xmltype")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -325,7 +448,7 @@ public class OracleTypeConverterTest {
                         .dataType("sys.xmltype")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -339,7 +462,7 @@ public class OracleTypeConverterTest {
                         .dataType("long")
                         .length(1L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -348,7 +471,7 @@ public class OracleTypeConverterTest {
 
         typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("clob").dataType("clob").build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -361,7 +484,7 @@ public class OracleTypeConverterTest {
                         .columnType("nclob")
                         .dataType("nclob")
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -373,7 +496,7 @@ public class OracleTypeConverterTest {
     public void testConvertBytes() {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("blob").dataType("blob").build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
@@ -382,7 +505,7 @@ public class OracleTypeConverterTest {
 
         typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("raw").dataType("raw").build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
@@ -396,7 +519,7 @@ public class OracleTypeConverterTest {
                         .dataType("raw")
                         .length(10L)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
@@ -409,7 +532,7 @@ public class OracleTypeConverterTest {
                         .columnType("long raw")
                         .dataType("long raw")
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, 
column.getDataType());
@@ -421,7 +544,7 @@ public class OracleTypeConverterTest {
     public void testConvertDatetime() {
         BasicTypeDefine<Object> typeDefine =
                 
BasicTypeDefine.builder().name("test").columnType("date").dataType("date").build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
@@ -435,7 +558,7 @@ public class OracleTypeConverterTest {
                         .dataType("timestamp")
                         .scale(6)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
@@ -449,7 +572,7 @@ public class OracleTypeConverterTest {
                         .dataType("timestamp with time zone")
                         .scale(6)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
@@ -463,7 +586,7 @@ public class OracleTypeConverterTest {
                         .dataType("timestamp with local time zone")
                         .scale(6)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
 
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
@@ -482,7 +605,7 @@ public class OracleTypeConverterTest {
                         null,
                         null);
         try {
-            OracleTypeConverter.INSTANCE.reconvert(column);
+            INSTANCE.reconvert(column);
             Assertions.fail();
         } catch (SeaTunnelRuntimeException e) {
             // ignore
@@ -496,7 +619,7 @@ public class OracleTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.BOOLEAN_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format("%s(%s)", OracleTypeConverter.ORACLE_NUMBER, 1),
@@ -509,7 +632,7 @@ public class OracleTypeConverterTest {
     public void testReconvertByte() {
         Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.BYTE_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getDataType());
@@ -520,7 +643,7 @@ public class OracleTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.SHORT_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getDataType());
@@ -530,7 +653,7 @@ public class OracleTypeConverterTest {
     public void testReconvertInt() {
         Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.INT_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getDataType());
@@ -540,7 +663,7 @@ public class OracleTypeConverterTest {
     public void testReconvertLong() {
         Column column = 
PhysicalColumn.builder().name("test").dataType(BasicType.LONG_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER, 
typeDefine.getDataType());
@@ -551,7 +674,7 @@ public class OracleTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.FLOAT_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 OracleTypeConverter.ORACLE_BINARY_FLOAT, 
typeDefine.getColumnType());
@@ -563,7 +686,7 @@ public class OracleTypeConverterTest {
         Column column =
                 
PhysicalColumn.builder().name("test").dataType(BasicType.DOUBLE_TYPE).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 OracleTypeConverter.ORACLE_BINARY_DOUBLE, 
typeDefine.getColumnType());
@@ -575,7 +698,7 @@ public class OracleTypeConverterTest {
         Column column =
                 PhysicalColumn.builder().name("test").dataType(new 
DecimalType(0, 0)).build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format(
@@ -588,7 +711,7 @@ public class OracleTypeConverterTest {
 
         column = PhysicalColumn.builder().name("test").dataType(new 
DecimalType(10, 2)).build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format("%s(%s,%s)", OracleTypeConverter.ORACLE_NUMBER, 
10, 2),
@@ -605,7 +728,7 @@ public class OracleTypeConverterTest {
                         .columnLength(null)
                         .build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB, 
typeDefine.getDataType());
@@ -617,7 +740,7 @@ public class OracleTypeConverterTest {
                         .columnLength(2000L)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format("%s(%s)", OracleTypeConverter.ORACLE_RAW, 
column.getColumnLength()),
@@ -631,7 +754,7 @@ public class OracleTypeConverterTest {
                         .columnLength(BYTES_2GB)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB, 
typeDefine.getDataType());
@@ -643,7 +766,7 @@ public class OracleTypeConverterTest {
                         .columnLength(BYTES_2GB + 1)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB, 
typeDefine.getDataType());
@@ -658,7 +781,7 @@ public class OracleTypeConverterTest {
                         .columnLength(null)
                         .build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals("VARCHAR2(4000)", typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_VARCHAR2, 
typeDefine.getDataType());
@@ -670,7 +793,7 @@ public class OracleTypeConverterTest {
                         .columnLength(2000L)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format(
@@ -685,7 +808,7 @@ public class OracleTypeConverterTest {
                         .columnLength(4000L)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format(
@@ -700,7 +823,7 @@ public class OracleTypeConverterTest {
                         .columnLength(40001L)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_CLOB, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_CLOB, 
typeDefine.getDataType());
@@ -714,7 +837,7 @@ public class OracleTypeConverterTest {
                         .dataType(LocalTimeType.LOCAL_DATE_TYPE)
                         .build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_DATE, 
typeDefine.getColumnType());
         Assertions.assertEquals(OracleTypeConverter.ORACLE_DATE, 
typeDefine.getDataType());
@@ -728,7 +851,7 @@ public class OracleTypeConverterTest {
                         .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
                         .build();
 
-        BasicTypeDefine typeDefine = 
OracleTypeConverter.INSTANCE.reconvert(column);
+        BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 OracleTypeConverter.ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE,
@@ -744,7 +867,7 @@ public class OracleTypeConverterTest {
                         .scale(3)
                         .build();
 
-        typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+        typeDefine = INSTANCE.reconvert(column);
         Assertions.assertEquals(column.getName(), typeDefine.getName());
         Assertions.assertEquals(
                 String.format("TIMESTAMP(%s) WITH LOCAL TIME ZONE", 
column.getScale()),
@@ -765,7 +888,7 @@ public class OracleTypeConverterTest {
                         .precision(38L)
                         .scale(-1)
                         .build();
-        Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        Column column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -778,7 +901,7 @@ public class OracleTypeConverterTest {
                         .precision(5L)
                         .scale(-2)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -791,7 +914,7 @@ public class OracleTypeConverterTest {
                         .precision(9L)
                         .scale(-2)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
@@ -804,7 +927,7 @@ public class OracleTypeConverterTest {
                         .precision(14L)
                         .scale(-11)
                         .build();
-        column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+        column = INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
         Assertions.assertEquals(new DecimalType(25, 0), column.getDataType());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index b7c4a54b59..9cd130219a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -25,11 +25,15 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLPa
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.OracleContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -77,6 +81,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                     + "    VARCHAR_10_COL                varchar2(10),\n"
                     + "    CHAR_10_COL                   char(10),\n"
                     + "    CLOB_COL                      clob,\n"
+                    + "    NUMBER_1             number(1),\n"
+                    + "    NUMBER_6             number(6),\n"
+                    + "    NUMBER_10             number(10),\n"
                     + "    NUMBER_3_SF_2_DP              number(3, 2),\n"
                     + "    NUMBER_7_SF_N2_DP             number(7, -2),\n"
                     + "    INTEGER_COL                   integer,\n"
@@ -97,6 +104,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                     + "    VARCHAR_10_COL                varchar2(10),\n"
                     + "    CHAR_10_COL                   char(10),\n"
                     + "    CLOB_COL                      clob,\n"
+                    + "    NUMBER_1             number(1),\n"
+                    + "    NUMBER_6             number(6),\n"
+                    + "    NUMBER_10             number(10),\n"
                     + "    NUMBER_3_SF_2_DP              number(3, 2),\n"
                     + "    NUMBER_7_SF_N2_DP             number(7, -2),\n"
                     + "    INTEGER_COL                   integer,\n"
@@ -115,6 +125,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                 "VARCHAR_10_COL",
                 "CHAR_10_COL",
                 "CLOB_COL",
+                "NUMBER_1",
+                "NUMBER_6",
+                "NUMBER_10",
                 "NUMBER_3_SF_2_DP",
                 "NUMBER_7_SF_N2_DP",
                 "INTEGER_COL",
@@ -148,6 +161,14 @@ public class JdbcOracleIT extends AbstractJdbcIT {
         dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1, 
1024);
     }
 
+    @TestTemplate
+    public void testOracleWithoutDecimalTypeNarrowing(TestContainer container) 
throws Exception {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
     @Override
     JdbcCase getJdbcCase() {
         Map<String, String> containerEnv = new HashMap<>();
@@ -207,6 +228,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                                 String.format("f%s", i),
                                 String.format("f%s", i),
                                 String.format("f%s", i),
+                                1,
+                                i * 10,
+                                i * 1000,
                                 BigDecimal.valueOf(1.1),
                                 BigDecimal.valueOf(2400),
                                 i,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
index d956894c34..4df8c7b993 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -30,7 +30,7 @@ source {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
+    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
@@ -46,7 +46,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
index 8a0c831044..1988b48872 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
@@ -31,7 +31,7 @@ source {
     user = testUser
     password = testPassword
     use_select_count = true
-    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
+    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
@@ -47,7 +47,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
index ebebdb5505..4d01da5c72 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
@@ -47,7 +47,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
index d00ce9b643..94a850fdd0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
@@ -48,7 +48,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf
new file mode 100644
index 0000000000..58e98f5def
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+    user = testUser
+    password = testPassword
+    decimal_type_narrowing = false
+    query = "SELECT NUMBER_1,NUMBER_6,NUMBER_10 FROM E2E_TABLE_SOURCE"
+    properties {
+       database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 20000
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 20000
+        }
+      ],
+      field_rules = [
+        {
+          field_name = NUMBER_1
+          field_type = "decimal(1, 0)"
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = NUMBER_6
+          field_type = "decimal(6, 0)"
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = NUMBER_10
+          field_type = "decimal(10, 0)"
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+
+}

Reply via email to