[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534867#comment-16534867
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6264#discussion_r200597054
--- Diff:
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
---
@@ -18,48 +18,99 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
-@PublicEvolving
-public abstract class Kafka010TableSource extends KafkaTableSource {
+@Internal
+public class Kafka010TableSource extends KafkaTableSource {
- // The deserialization schema for the Kafka records
- private final DeserializationSchema<Row> deserializationSchema;
+ /**
+ * Creates a Kafka 0.10 {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time
attribute, null if no
+ * processing time field is defined.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the
table schema to
+ * fields of the physical returned
type or null.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for
decoding records from Kafka.
+ * @param startupMode Startup mode for the contained
consumer.
+ * @param specificStartupOffsets Specific startup offsets; only
relevant when startup
+ * mode is {@link
StartupMode#SPECIFIC_OFFSETS}.
+ */
+ public Kafka010TableSource(
+ TableSchema schema,
+ String proctimeAttribute,
+ List<RowtimeAttributeDescriptor>
rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic, Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ super(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ /**
+ * Creates a Kafka 0.10 {@link StreamTableSource}.
+ *
+ * @param schema Schema of the produced table.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding
records from Kafka.
+ */
+ public Kafka010TableSource(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema) {
+
+ super(schema, topic, properties, deserializationSchema);
+ }
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka
records.
- * @param typeInfo Type information describing the result
type. The field names are used
- * to parse the JSON file and so are the
types.
+ * @param typeInfo Not relevant anymore.
--- End diff --
Why is it not relevant? Doesn't it brake the backward compatibility? If so,
it would be safer to drop the constructor altogether.
The only reason for keeping it, would be if **ALL** old invocations will
still work the same as they used to, regardless of this value. However in that
case, I would be also inclined to drop this constructor, since it's easy change
for the users and the class was `@PublicEvolving` and now it's `@Internal`
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)