Github user uybhatti commented on a diff in the pull request:
https://github.com/apache/flink/pull/4638#discussion_r139692943
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -62,10 +88,107 @@
DeserializationSchema<Row> deserializationSchema,
TypeInformation<Row> typeInfo) {
- this.topic = Preconditions.checkNotNull(topic, "Topic");
- this.properties = Preconditions.checkNotNull(properties,
"Properties");
- this.deserializationSchema =
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
- this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type
information");
+ this.topic = Preconditions.checkNotNull(topic, "Topic must not
be null.");
+ this.properties = Preconditions.checkNotNull(properties,
"Properties must not be null.");
+ this.deserializationSchema =
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must
not be null.");
+ this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type
information must not be null.");
+ }
+
+ /**
+ * Adds processing time attribute to the table. The attribute is
appended to each row.
+ *
+ * @param proctime The name of the added processing time attribute.
+ */
+ public void addProcTimeAttribute(String proctime) {
+ Preconditions.checkNotNull(proctime, "Processing time attribute
must not be null.");
+ this.procTimeAttribute = proctime;
+ }
+
+ /**
+ * Adds an ingestion time attribute to the table. The attribute is
append at the end of each row.
+ *
+ * <p>For each row, the ingestion time attribute is initialized with
the current time when the row
+ * is read from Kafka. From there on, it behaves as an event time
attribute.
+ *
+ * @param ingestionTime The name of the added ingestion time attribute.
+ */
+ public void addIngestionTimeAttribute(String ingestionTime) {
+ Preconditions.checkNotNull(ingestionTime, "Ingestion time
attribute must not be null.");
+ if (this.rowTimeAttribute != null) {
+ throw new ValidationException(
+ "You can only specify an ingestion time
attribute OR a row time attribute.");
+ }
+ this.rowTimeAttribute = ingestionTime;
--- End diff --
it should be `this.ingestionTimeAttribute = ingestionTime;`
Otherwise no need of `ingestionTimeAttribute` variable
---