Repository: flink Updated Branches: refs/heads/master 458c909ca -> 7ff3f373a
[FLINK-8118] [table] Fix documentation mistakes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ff3f373 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ff3f373 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ff3f373 Branch: refs/heads/master Commit: 7ff3f373a353f9ed8cd1973f2bde172ab7bd992d Parents: 4083c70 Author: twalthr <[email protected]> Authored: Thu Nov 23 14:30:15 2017 +0100 Committer: twalthr <[email protected]> Committed: Thu Nov 23 14:49:14 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sourceSinks.md | 56 ++++++++++---------- .../kafka/KafkaTableSourceTestBase.java | 5 +- 2 files changed, 31 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ff3f373/docs/dev/table/sourceSinks.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index aaf23bc..2b10278 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -63,7 +63,7 @@ A `KafkaJsonTableSource` is created and configured using a builder. The followin <div data-lang="java" markdown="1"> {% highlight java %} // create builder -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // set Kafka topic .forTopic("sensors") // set Kafka consumer properties @@ -80,7 +80,7 @@ TableSource source = Kafka010JsonTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} // create builder -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // set Kafka topic .forTopic("sensors") // set Kafka consumer properties @@ -108,7 +108,7 @@ Map<String, String> mapping = new HashMap<>(); mapping.put("sensorId", "id"); mapping.put("temperature", "temp"); -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // ... // set Table schema .withSchema(TableSchema.builder() @@ -126,7 +126,7 @@ TableSource source = Kafka010JsonTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // ... // set Table schema .withSchema(TableSchema.builder() @@ -150,7 +150,7 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder() <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // ... // configure missing field behavior .failOnMissingField(true) @@ -160,7 +160,7 @@ TableSource source = Kafka010JsonTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // ... // configure missing field behavior .failOnMissingField(true) @@ -174,20 +174,20 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder() <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // ... // start reading from the earliest offset - .startReadingFromEarliest() + .fromEarliest() .build(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // ... // start reading from the earliest offset - .startReadingFromEarliest() + .fromEarliest() .build() {% endhighlight %} </div> @@ -205,7 +205,7 @@ A `KafkaAvroTableSource` is created and configured using a builder. The followin <div data-lang="java" markdown="1"> {% highlight java %} // create builder -TableSource source = Kafka010AvroTableSource.builder() +KafkaTableSource source = Kafka010AvroTableSource.builder() // set Kafka topic .forTopic("sensors") // set Kafka consumer properties @@ -224,7 +224,7 @@ TableSource source = Kafka010AvroTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} // create builder -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // set Kafka topic .forTopic("sensors") // set Kafka consumer properties @@ -256,7 +256,7 @@ Map<String, String> mapping = new HashMap<>(); mapping.put("sensorId", "id"); mapping.put("temperature", "temp"); -TableSource source = Kafka010AvroTableSource.builder() +KafkaTableSource source = Kafka010AvroTableSource.builder() // ... // set Table schema .withSchema(TableSchema.builder() @@ -264,15 +264,15 @@ TableSource source = Kafka010AvroTableSource.builder() .field("temperature", Types.DOUBLE()).build()) // set class of Avro record with fields [id, temp] .forAvroRecordClass(SensorReading.class) - // set mapping from table fields to JSON fields - .withTableToJsonMapping(mapping) + // set mapping from table fields to Avro fields + .withTableToAvroMapping(mapping) .build(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010AvroTableSource.builder() +val source: KafkaTableSource = Kafka010AvroTableSource.builder() // ... // set Table schema .withSchema(TableSchema.builder() @@ -280,8 +280,8 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder() .field("temperature", Types.DOUBLE).build()) // set class of Avro record with fields [id, temp] .forAvroRecordClass(classOf[SensorReading]) - // set mapping from table fields to JSON fields - .withTableToJsonMapping(Map( + // set mapping from table fields to Avro fields + .withTableToAvroMapping(Map( "sensorId" -> "id", "temperature" -> "temp").asJava) .build() @@ -294,20 +294,20 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder() <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010AvroTableSource.builder() // ... // start reading from the earliest offset - .startReadingFromEarliest() + .fromEarliest() .build(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010AvroTableSource.builder() // ... // start reading from the earliest offset - .startReadingFromEarliest() + .fromEarliest() .build() {% endhighlight %} </div> @@ -326,7 +326,7 @@ A table schema field of type `SQL_TIMESTAMP` can be declared as a processing tim <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) @@ -341,7 +341,7 @@ TableSource source = Kafka010JsonTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG) @@ -372,7 +372,7 @@ The following example shows how to configure a rowtime attribute. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) @@ -392,7 +392,7 @@ TableSource source = Kafka010JsonTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG) @@ -418,7 +418,7 @@ Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies whe <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -TableSource source = Kafka010JsonTableSource.builder() +KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) @@ -437,7 +437,7 @@ TableSource source = Kafka010JsonTableSource.builder() <div data-lang="scala" markdown="1"> {% highlight scala %} -val source: TableSource[_] = Kafka010JsonTableSource.builder() +val source: KafkaTableSource = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG) http://git-wip-us.apache.org/repos/asf/flink/blob/7ff3f373/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java index 64dac06..688fd73 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -159,7 +159,7 @@ public abstract class KafkaTableSourceTestBase { } @Test - public void testKafkaTSRowtimeAttribute() { + public void testRowtimeAttribute2() { KafkaTableSource.Builder b = getBuilder(); configureBuilder(b); @@ -191,7 +191,8 @@ public abstract class KafkaTableSourceTestBase { } @Test - public void testKafkaTSSetConsumeOffsets() { + @SuppressWarnings("unchecked") + public void testConsumerOffsets() { KafkaTableSource.Builder b = getBuilder(); configureBuilder(b);
