[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534932#comment-16534932
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6264#discussion_r200677056
--- Diff:
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
---
@@ -18,48 +18,99 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
-@PublicEvolving
-public abstract class Kafka010TableSource extends KafkaTableSource {
+@Internal
+public class Kafka010TableSource extends KafkaTableSource {
- // The deserialization schema for the Kafka records
- private final DeserializationSchema<Row> deserializationSchema;
+ /**
+ * Creates a Kafka 0.10 {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time
attribute, null if no
+ * processing time field is defined.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the
table schema to
+ * fields of the physical returned
type or null.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for
decoding records from Kafka.
+ * @param startupMode Startup mode for the contained
consumer.
+ * @param specificStartupOffsets Specific startup offsets; only
relevant when startup
+ * mode is {@link
StartupMode#SPECIFIC_OFFSETS}.
+ */
+ public Kafka010TableSource(
+ TableSchema schema,
+ String proctimeAttribute,
+ List<RowtimeAttributeDescriptor>
rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic, Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ super(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ /**
+ * Creates a Kafka 0.10 {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding
records from Kafka.
+ */
+ public Kafka010TableSource(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema) {
+
+ super(schema, topic, properties, deserializationSchema);
+ }
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka
records.
- * @param typeInfo Type information describing the result
type. The field names are used
- * to parse the JSON file and so are the
types.
+ * @param typeInfo Not relevant anymore.
--- End diff --
The `typeInfo` should have been the same as the produced type that has been
returned by the `deserializationSchema`. I'm also fine with dropping the
constructor already.
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)