[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16543231#comment-16543231
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6323#discussion_r202269870
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -134,34 +190,60 @@ public String getProctimeAttribute() {
return rowtimeAttributeDescriptors;
}
+ @Override
+ public Map<String, String> getFieldMapping() {
+ return fieldMapping;
+ }
+
@Override
public String explainSource() {
return TableConnectorUtil.generateRuntimeName(this.getClass(),
schema.getColumnNames());
}
+ /**
+ * Returns the properties for the Kafka consumer.
+ *
+ * @return properties for the Kafka consumer.
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Returns the deserialization schema.
+ *
+ * @return The deserialization schema
+ */
+ public DeserializationSchema<Row> getDeserializationSchema(){
+ return deserializationSchema;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (!(o instanceof KafkaTableSource)) {
+ // TODO force classes to be equal once we drop support for
format-specific table sources
+ // if (o == null || getClass() != o.getClass()) {
+ if (o == null || !(o instanceof KafkaTableSource)) {
return false;
}
- KafkaTableSource that = (KafkaTableSource) o;
+ final KafkaTableSource that = (KafkaTableSource) o;
return Objects.equals(schema, that.schema) &&
- Objects.equals(topic, that.topic) &&
- Objects.equals(properties, that.properties) &&
- Objects.equals(returnType, that.returnType) &&
Objects.equals(proctimeAttribute,
that.proctimeAttribute) &&
Objects.equals(rowtimeAttributeDescriptors,
that.rowtimeAttributeDescriptors) &&
+ Objects.equals(fieldMapping, that.fieldMapping) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(properties, that.properties) &&
+ Objects.equals(deserializationSchema,
that.deserializationSchema) &&
startupMode == that.startupMode &&
Objects.equals(specificStartupOffsets,
that.specificStartupOffsets);
}
@Override
public int hashCode() {
- return Objects.hash(schema, topic, properties, returnType,
- proctimeAttribute, rowtimeAttributeDescriptors,
startupMode, specificStartupOffsets);
+ return Objects.hash(schema, proctimeAttribute,
rowtimeAttributeDescriptors, fieldMapping,
--- End diff --
format one entry per line
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)