This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b8fd3264b51835e5bb8eede3f8e9bcce3cd8a22 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Dec 19 16:48:14 2019 +0100 [hotfix][connectors/kafka, table, tests] Fix types in schema ddl in tests --- .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 32 ++++++++++------------ .../connectors/kafka/KafkaTableTestBase.java | 6 ++-- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index 755630a..d46c69e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -110,7 +110,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { // prepare parameters for Kafka table source final TableSchema schema = TableSchema.builder() .field(FRUIT_NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(COUNT, DataTypes.DECIMAL(38, 18)) .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) .field(PROC_TIME, DataTypes.TIMESTAMP(3)) .build(); @@ -128,13 +128,12 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); - TableSchema tableSchema = TableSchema.builder() - .field(NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(10, 3)) - .field(TIME, DataTypes.TIMESTAMP(3)) - .build(); final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( - tableSchema.toRowType() + TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(38, 18)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build().toRowType() ); final KafkaTableSourceBase expected = getExpectedKafkaTableSource( @@ -148,7 +147,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { StartupMode.SPECIFIC_OFFSETS, specificOffsets); - TableSourceValidation.validateTableSource(expected, tableSchema); + TableSourceValidation.validateTableSource(expected, schema); // construct table source using descriptors and table source factory final Map<String, String> propertiesMap = new HashMap<>(); @@ -178,7 +177,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { // prepare parameters for Kafka table source final TableSchema schema = TableSchema.builder() .field(FRUIT_NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(COUNT, DataTypes.DECIMAL(38, 18)) .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) .field(PROC_TIME, DataTypes.TIMESTAMP(3)) .build(); @@ -196,13 +195,12 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); - TableSchema tableSchema = TableSchema.builder() - .field(NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(10, 3)) - .field(TIME, DataTypes.TIMESTAMP(3)) - .build(); final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( - tableSchema.toRowType() + TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(38, 18)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build().toRowType() ); final KafkaTableSourceBase expected = getExpectedKafkaTableSource( @@ -216,7 +214,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { StartupMode.SPECIFIC_OFFSETS, specificOffsets); - TableSourceValidation.validateTableSource(expected, tableSchema); + TableSourceValidation.validateTableSource(expected, schema); // construct table source using descriptors and table source factory final Map<String, String> legacyPropertiesMap = new HashMap<>(); @@ -267,7 +265,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { .withSchema( new Schema() .field(FRUIT_NAME, DataTypes.STRING()).from(NAME) - .field(COUNT, DataTypes.DECIMAL(10, 3)) // no from so it must match with the input + .field(COUNT, DataTypes.DECIMAL(38, 18)) // no from so it must match with the input .field(EVENT_TIME, DataTypes.TIMESTAMP(3)).rowtime( new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending()) .field(PROC_TIME, DataTypes.TIMESTAMP(3)).proctime()) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java index 1731bb8..26837ad 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java @@ -72,7 +72,7 @@ public abstract class KafkaTableTestBase extends KafkaTestBase { // TODO: use DDL to register Kafka once FLINK-15282 is fixed. // we have to register into Catalog manually because it will use Calcite's ParameterScope TableSchema schema = TableSchema.builder() - .field("price", DataTypes.DECIMAL(10, 2)) + .field("price", DataTypes.DECIMAL(38, 18)) .field("currency", DataTypes.STRING()) .field("log_ts", DataTypes.TIMESTAMP(3)) .field("ts", DataTypes.TIMESTAMP(3), "log_ts + INTERVAL '1' SECOND") @@ -101,7 +101,7 @@ public abstract class KafkaTableTestBase extends KafkaTestBase { // TODO: use the following DDL instead of the preceding code to register Kafka // String ddl = "CREATE TABLE kafka (\n" + -// " price DECIMAL(10, 2),\n" + +// " price DECIMAL(38, 18),\n" + // " currency STRING,\n" + // " log_ts TIMESTAMP(3),\n" + // " ts AS log_ts + INTERVAL '1' SECOND,\n" + @@ -138,7 +138,7 @@ public abstract class KafkaTableTestBase extends KafkaTestBase { " CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n" + " CAST(MAX(ts) AS VARCHAR),\n" + " COUNT(*),\n" + - " MAX(price)\n" + + " CAST(MAX(price) AS DECIMAL(10, 2))\n" + "FROM kafka\n" + "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
