Repository: flink
Updated Branches:
  refs/heads/release-1.4 828ef09b0 -> 13631b961


[FLINK-8118] [table] Improve KafkaTableSource documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13631b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13631b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13631b96

Branch: refs/heads/release-1.4
Commit: 13631b9617d32e46eba51c9125019ec5e77c39f3
Parents: 2fb2458
Author: twalthr <[email protected]>
Authored: Thu Nov 23 14:30:15 2017 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Thu Nov 23 15:34:22 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   | 56 ++++++++++----------
 .../kafka/KafkaTableSourceTestBase.java         |  5 +-
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/13631b96/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index aaf23bc..2b10278 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -63,7 +63,7 @@ A `KafkaJsonTableSource` is created and configured using a 
builder. The followin
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // create builder
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -80,7 +80,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -108,7 +108,7 @@ Map<String, String> mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -126,7 +126,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -150,7 +150,7 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -160,7 +160,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -174,20 +174,20 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build()
 {% endhighlight %}
 </div>
@@ -205,7 +205,7 @@ A `KafkaAvroTableSource` is created and configured using a 
builder. The followin
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // create builder
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -224,7 +224,7 @@ TableSource source = Kafka010AvroTableSource.builder()
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -256,7 +256,7 @@ Map<String, String> mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -264,15 +264,15 @@ TableSource source = Kafka010AvroTableSource.builder()
     .field("temperature", Types.DOUBLE()).build())
   // set class of Avro record with fields [id, temp]
   .forAvroRecordClass(SensorReading.class)
-  // set mapping from table fields to JSON fields
-  .withTableToJsonMapping(mapping)
+  // set mapping from table fields to Avro fields
+  .withTableToAvroMapping(mapping)
   .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010AvroTableSource.builder()
+val source: KafkaTableSource = Kafka010AvroTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -280,8 +280,8 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
     .field("temperature", Types.DOUBLE).build())
   // set class of Avro record with fields [id, temp]
   .forAvroRecordClass(classOf[SensorReading])
-  // set mapping from table fields to JSON fields
-  .withTableToJsonMapping(Map(
+  // set mapping from table fields to Avro fields
+  .withTableToAvroMapping(Map(
     "sensorId" -> "id", 
     "temperature" -> "temp").asJava)
   .build()
@@ -294,20 +294,20 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010AvroTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build()
 {% endhighlight %}
 </div>
@@ -326,7 +326,7 @@ A table schema field of type `SQL_TIMESTAMP` can be 
declared as a processing tim
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ... 
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG())  
@@ -341,7 +341,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG)
@@ -372,7 +372,7 @@ The following example shows how to configure a rowtime 
attribute.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG())
@@ -392,7 +392,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG)
@@ -418,7 +418,7 @@ Since Kafka 0.10, Kafka messages have a timestamp as 
metadata that specifies whe
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG())
@@ -437,7 +437,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG)

http://git-wip-us.apache.org/repos/asf/flink/blob/13631b96/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 64dac06..688fd73 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -159,7 +159,7 @@ public abstract class KafkaTableSourceTestBase {
        }
 
        @Test
-       public void testKafkaTSRowtimeAttribute() {
+       public void testRowtimeAttribute2() {
                KafkaTableSource.Builder b = getBuilder();
                configureBuilder(b);
 
@@ -191,7 +191,8 @@ public abstract class KafkaTableSourceTestBase {
        }
 
        @Test
-       public void testKafkaTSSetConsumeOffsets() {
+       @SuppressWarnings("unchecked")
+       public void testConsumerOffsets() {
                KafkaTableSource.Builder b = getBuilder();
                configureBuilder(b);
 

Reply via email to