This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b8fd3264b51835e5bb8eede3f8e9bcce3cd8a22
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Dec 19 16:48:14 2019 +0100

    [hotfix][connectors/kafka, table, tests] Fix types in schema ddl in tests
---
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 32 ++++++++++------------
 .../connectors/kafka/KafkaTableTestBase.java       |  6 ++--
 2 files changed, 18 insertions(+), 20 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 755630a..d46c69e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -110,7 +110,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                // prepare parameters for Kafka table source
                final TableSchema schema = TableSchema.builder()
                        .field(FRUIT_NAME, DataTypes.STRING())
-                       .field(COUNT, DataTypes.DECIMAL(10, 3))
+                       .field(COUNT, DataTypes.DECIMAL(38, 18))
                        .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
                        .field(PROC_TIME, DataTypes.TIMESTAMP(3))
                        .build();
@@ -128,13 +128,12 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
 
-               TableSchema tableSchema = TableSchema.builder()
-                       .field(NAME, DataTypes.STRING())
-                       .field(COUNT, DataTypes.DECIMAL(10, 3))
-                       .field(TIME, DataTypes.TIMESTAMP(3))
-                       .build();
                final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
-                       tableSchema.toRowType()
+                       TableSchema.builder()
+                               .field(NAME, DataTypes.STRING())
+                               .field(COUNT, DataTypes.DECIMAL(38, 18))
+                               .field(TIME, DataTypes.TIMESTAMP(3))
+                               .build().toRowType()
                );
 
                final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
@@ -148,7 +147,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                        StartupMode.SPECIFIC_OFFSETS,
                        specificOffsets);
 
-               TableSourceValidation.validateTableSource(expected, 
tableSchema);
+               TableSourceValidation.validateTableSource(expected, schema);
 
                // construct table source using descriptors and table source 
factory
                final Map<String, String> propertiesMap = new HashMap<>();
@@ -178,7 +177,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                // prepare parameters for Kafka table source
                final TableSchema schema = TableSchema.builder()
                        .field(FRUIT_NAME, DataTypes.STRING())
-                       .field(COUNT, DataTypes.DECIMAL(10, 3))
+                       .field(COUNT, DataTypes.DECIMAL(38, 18))
                        .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
                        .field(PROC_TIME, DataTypes.TIMESTAMP(3))
                        .build();
@@ -196,13 +195,12 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
 
-               TableSchema tableSchema = TableSchema.builder()
-                       .field(NAME, DataTypes.STRING())
-                       .field(COUNT, DataTypes.DECIMAL(10, 3))
-                       .field(TIME, DataTypes.TIMESTAMP(3))
-                       .build();
                final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
-                       tableSchema.toRowType()
+                       TableSchema.builder()
+                               .field(NAME, DataTypes.STRING())
+                               .field(COUNT, DataTypes.DECIMAL(38, 18))
+                               .field(TIME, DataTypes.TIMESTAMP(3))
+                               .build().toRowType()
                );
 
                final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
@@ -216,7 +214,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                        StartupMode.SPECIFIC_OFFSETS,
                        specificOffsets);
 
-               TableSourceValidation.validateTableSource(expected, 
tableSchema);
+               TableSourceValidation.validateTableSource(expected, schema);
 
                // construct table source using descriptors and table source 
factory
                final Map<String, String> legacyPropertiesMap = new HashMap<>();
@@ -267,7 +265,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                                .withSchema(
                                        new Schema()
                                                .field(FRUIT_NAME, 
DataTypes.STRING()).from(NAME)
-                                               .field(COUNT, 
DataTypes.DECIMAL(10, 3)) // no from so it must match with the input
+                                               .field(COUNT, 
DataTypes.DECIMAL(38, 18)) // no from so it must match with the input
                                                .field(EVENT_TIME, 
DataTypes.TIMESTAMP(3)).rowtime(
                                                        new 
Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
                                                .field(PROC_TIME, 
DataTypes.TIMESTAMP(3)).proctime())
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
index 1731bb8..26837ad 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java
@@ -72,7 +72,7 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBase {
                // TODO: use DDL to register Kafka once FLINK-15282 is fixed.
                //  we have to register into Catalog manually because it will 
use Calcite's ParameterScope
                TableSchema schema = TableSchema.builder()
-                       .field("price", DataTypes.DECIMAL(10, 2))
+                       .field("price", DataTypes.DECIMAL(38, 18))
                        .field("currency", DataTypes.STRING())
                        .field("log_ts", DataTypes.TIMESTAMP(3))
                        .field("ts", DataTypes.TIMESTAMP(3), "log_ts + INTERVAL 
'1' SECOND")
@@ -101,7 +101,7 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBase {
 
                // TODO: use the following DDL instead of the preceding code to 
register Kafka
 //             String ddl = "CREATE TABLE kafka (\n" +
-//                     "  price DECIMAL(10, 2),\n" +
+//                     "  price DECIMAL(38, 18),\n" +
 //                     "  currency STRING,\n" +
 //                     "  log_ts TIMESTAMP(3),\n" +
 //                     "  ts AS log_ts + INTERVAL '1' SECOND,\n" +
@@ -138,7 +138,7 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBase {
                        "  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS 
VARCHAR),\n" +
                        "  CAST(MAX(ts) AS VARCHAR),\n" +
                        "  COUNT(*),\n" +
-                       "  MAX(price)\n" +
+                       "  CAST(MAX(price) AS DECIMAL(10, 2))\n" +
                        "FROM kafka\n" +
                        "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
 

Reply via email to