[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534878#comment-16534878
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6264#discussion_r200653181
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.formats.utils.TestDeserializationSchema;
+import org.apache.flink.table.formats.utils.TestTableFormat;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Abstract test base for {@link KafkaTableSourceFactory}.
+ */
+public abstract class KafkaTableSourceFactoryTestBase {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTableSource() {
+
+ // prepare parameters for Kafka table source
+
+ final TableSchema schema = TableSchema.builder()
+ .field("fruit-name", Types.STRING())
+ .field("count", Types.DECIMAL())
+ .field("event-time", Types.SQL_TIMESTAMP())
+ .field("proc-time", Types.SQL_TIMESTAMP())
+ .build();
+
+ final String proctimeAttribute = "proc-time";
+
+ final List<RowtimeAttributeDescriptor>
rowtimeAttributeDescriptors = Collections.singletonList(
+ new RowtimeAttributeDescriptor("event-time", new
ExistingField("time"), new AscendingTimestamps()));
+
+ final Map<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put("fruit-name", "name");
+ fieldMapping.put("count", "count");
+
+ final String topic = "myTopic";
+
+ final Properties kafkaProperties = new Properties();
+ kafkaProperties.setProperty("zookeeper.connect", "dummy");
+ kafkaProperties.setProperty("group.id", "dummy");
+
+ final Map<KafkaTopicPartition, Long> specificOffsets = new
HashMap<>();
+ specificOffsets.put(new KafkaTopicPartition(topic, 0), 100L);
+ specificOffsets.put(new KafkaTopicPartition(topic, 1), 123L);
+
+ final TestDeserializationSchema deserializationSchema = new
TestDeserializationSchema(
+ TableSchema.builder()
+ .field("name", Types.STRING())
+ .field("count", Types.DECIMAL())
+ .field("time", Types.SQL_TIMESTAMP())
+ .build()
+ .toRowType()
+ );
+
+ final StartupMode startupMode = StartupMode.SPECIFIC_OFFSETS;
+
+ final KafkaTableSource expected = getKafkaTableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ kafkaProperties,
+ deserializationSchema,
+ startupMode,
+ specificOffsets);
+
+ // construct table source using descriptors and table source
factory
+
+ final Map<Integer, Long> offsets = new HashMap<>();
+ offsets.put(0, 100L);
+ offsets.put(1, 123L);
+
+ final TestTableSourceDescriptor testDesc = new
TestTableSourceDescriptor(
+ new Kafka()
+ .version(getKafkaVersion())
+ .topic(topic)
+ .properties(kafkaProperties)
+ .startFromSpecificOffsets(offsets))
+ .addFormat(new TestTableFormat())
+ .addSchema(
+ new Schema()
+ .field("fruit-name",
Types.STRING()).from("name")
+ .field("count", Types.DECIMAL()) // no
from so it must match with the input
+ .field("event-time",
Types.SQL_TIMESTAMP()).rowtime(
+ new
Rowtime().timestampsFromField("time").watermarksPeriodicAscending())
+ .field("proc-time",
Types.SQL_TIMESTAMP()).proctime());
+
+ final TableSource<?> foundSource =
TableSourceFactoryService.findAndCreateTableSource(testDesc);
+
+ assertEquals(expected, foundSource);
+
+ // test Kafka consumer
+ final KafkaTableSource kafkaSource = (KafkaTableSource)
foundSource;
+ final KafkaTableSource observed = spy(kafkaSource);
+ final StreamExecutionEnvironment env =
mock(StreamExecutionEnvironment.class);
--- End diff --
Please use proper mocks here. Mockito tests are hard to maintain in the
future.
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)