asfgit closed pull request #7208: FLINK-11044 connect docs fix for
registerTableSink
URL: https://github.com/apache/flink/pull/7208
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index effd913707e..d8677714fa8 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -312,7 +312,7 @@ The following timestamp extractors are supported:
.timestampsFromField("ts_field") // required: original field name in
the input
)
-// Converts the assigned timestamps from a DataStream API record into the
rowtime attribute
+// Converts the assigned timestamps from a DataStream API record into the
rowtime attribute
// and thus preserves the assigned timestamps from the source.
// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
.rowtime(
@@ -337,7 +337,7 @@ rowtime:
type: from-field
from: "ts_field" # required: original field name in the
input
-# Converts the assigned timestamps from a DataStream API record into the
rowtime attribute
+# Converts the assigned timestamps from a DataStream API record into the
rowtime attribute
# and thus preserves the assigned timestamps from the source.
rowtime:
timestamps:
@@ -351,7 +351,7 @@ The following watermark strategies are supported:
<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
{% highlight java %}
-// Sets a watermark strategy for ascending rowtime attributes. Emits a
watermark of the maximum
+// Sets a watermark strategy for ascending rowtime attributes. Emits a
watermark of the maximum
// observed timestamp so far minus 1. Rows that have a timestamp equal to the
max timestamp
// are not late.
.rowtime(
@@ -377,7 +377,7 @@ The following watermark strategies are supported:
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
-# Sets a watermark strategy for ascending rowtime attributes. Emits a
watermark of the maximum
+# Sets a watermark strategy for ascending rowtime attributes. Emits a
watermark of the maximum
# observed timestamp so far minus 1. Rows that have a timestamp equal to the
max timestamp
# are not late.
rowtime:
@@ -695,7 +695,7 @@ connector:
**Key extraction:** Flink automatically extracts valid keys from a query. For
example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key
of the fields `a` and `b`. The Elasticsearch connector generates a document ID
string for every row by concatenating all key fields in the order defined in
the query using a key delimiter. A custom representation of null literals for
key fields can be defined.
-<span class="label label-danger">Attention</span> A JSON format defines how to
encode documents for the external system, therefore, it must be added as a
[dependency](connect.html#formats).
+<span class="label label-danger">Attention</span> A JSON format defines how to
encode documents for the external system, therefore, it must be added as a
[dependency](connect.html#formats).
{% top %}
@@ -717,8 +717,8 @@ The CSV format allows to read and write comma-separated
rows.
new Csv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
- .fieldDelimiter(",") // optional: string delimiter "," by
default
- .lineDelimiter("\n") // optional: string delimiter "\n" by
default
+ .fieldDelimiter(",") // optional: string delimiter "," by
default
+ .lineDelimiter("\n") // optional: string delimiter "\n" by
default
.quoteCharacter('"') // optional: single character for string
values, empty by default
.commentPrefix('#') // optional: string to indicate
comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by
default it is not skipped
@@ -736,8 +736,8 @@ format:
type: VARCHAR
- name: field2
type: TIMESTAMP
- field-delimiter: "," # optional: string delimiter "," by default
- line-delimiter: "\n" # optional: string delimiter "\n" by default
+ field-delimiter: "," # optional: string delimiter "," by default
+ line-delimiter: "\n" # optional: string delimiter "\n" by default
quote-character: '"' # optional: single character for string values,
empty by default
comment-prefix: '#' # optional: string to indicate comments, empty by
default
ignore-first-line: false # optional: boolean flag to ignore the first
line, by default it is not skipped
@@ -992,7 +992,7 @@ These are the additional `TableSink`s which are provided
with Flink:
| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** |
**Description**
| `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC
table.
-| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append |
Writes a Table to a Cassandra table.
+| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append |
Writes a Table to a Cassandra table.
### OrcTableSource
@@ -1044,7 +1044,7 @@ val orcTableSource = OrcTableSource.builder()
### CsvTableSink
-The `CsvTableSink` emits a `Table` to one or more CSV files.
+The `CsvTableSink` emits a `Table` to one or more CSV files.
The sink only supports append-only streaming tables. It cannot be used to emit
a `Table` that is continuously updated. See the [documentation on Table to
Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion)
for details. When emitting a streaming table, rows are written at least once
(if checkpointing is enabled) and the `CsvTableSink` does not split output
files into bucket files but continuously writes to the same files.
@@ -1053,17 +1053,17 @@ The sink only supports append-only streaming tables. It
cannot be used to emit a
{% highlight java %}
CsvTableSink sink = new CsvTableSink(
- path, // output path
+ path, // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
WriteMode.OVERWRITE); // optional: override existing files
tableEnv.registerTableSink(
"csvOutputTable",
- sink,
// specify table schema
new String[]{"f0", "f1"},
- new TypeInformation[]{Types.STRING, Types.INT});
+ new TypeInformation[]{Types.STRING, Types.INT},
+ sink);
Table table = ...
table.insertInto("csvOutputTable");
@@ -1074,17 +1074,17 @@ table.insertInto("csvOutputTable");
{% highlight scala %}
val sink: CsvTableSink = new CsvTableSink(
- path, // output path
+ path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE) // optional: override existing files
tableEnv.registerTableSink(
"csvOutputTable",
- sink,
// specify table schema
Array[String]("f0", "f1"),
- Array[TypeInformation[_]](Types.STRING, Types.INT))
+ Array[TypeInformation[_]](Types.STRING, Types.INT),
+ sink)
val table: Table = ???
table.insertInto("csvOutputTable")
@@ -1113,10 +1113,10 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
tableEnv.registerTableSink(
"jdbcOutputTable",
- sink,
// specify table schema
new String[]{"id"},
- new TypeInformation[]{Types.INT});
+ new TypeInformation[]{Types.INT},
+ sink);
Table table = ...
table.insertInto("jdbcOutputTable");
@@ -1134,10 +1134,10 @@ val sink: JDBCAppendTableSink =
JDBCAppendTableSink.builder()
tableEnv.registerTableSink(
"jdbcOutputTable",
- sink,
// specify table schema
Array[String]("id"),
- Array[TypeInformation[_]](Types.INT))
+ Array[TypeInformation[_]](Types.INT),
+ sink)
val table: Table = ???
table.insertInto("jdbcOutputTable")
@@ -1145,7 +1145,7 @@ table.insertInto("jdbcOutputTable")
</div>
</div>
-Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify
the name of the JDBC driver, the JDBC URL, the query to be executed, and the
field types of the JDBC table.
+Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify
the name of the JDBC driver, the JDBC URL, the query to be executed, and the
field types of the JDBC table.
{% top %}
@@ -1164,16 +1164,16 @@ To use the `CassandraAppendTableSink`, you have to add
the Cassandra connector d
ClusterBuilder builder = ... // configure Cassandra cluster connection
CassandraAppendTableSink sink = new CassandraAppendTableSink(
- builder,
+ builder,
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
tableEnv.registerTableSink(
"cassandraOutputTable",
- sink,
// specify table schema
new String[]{"id", "name", "value"},
- new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE});
+ new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
+ sink);
Table table = ...
table.insertInto(cassandraOutputTable);
@@ -1185,16 +1185,16 @@ table.insertInto(cassandraOutputTable);
val builder: ClusterBuilder = ... // configure Cassandra cluster connection
val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
- builder,
+ builder,
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
tableEnv.registerTableSink(
"cassandraOutputTable",
- sink,
// specify table schema
Array[String]("id", "name", "value"),
- Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE))
+ Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
+ sink)
val table: Table = ???
table.insertInto(cassandraOutputTable)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services