asfgit closed pull request #7208: FLINK-11044 connect docs fix for 
registerTableSink
URL: https://github.com/apache/flink/pull/7208
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index effd913707e..d8677714fa8 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -312,7 +312,7 @@ The following timestamp extractors are supported:
     .timestampsFromField("ts_field")    // required: original field name in 
the input
 )
 
-// Converts the assigned timestamps from a DataStream API record into the 
rowtime attribute 
+// Converts the assigned timestamps from a DataStream API record into the 
rowtime attribute
 // and thus preserves the assigned timestamps from the source.
 // This requires a source that assigns timestamps (e.g., Kafka 0.10+).
 .rowtime(
@@ -337,7 +337,7 @@ rowtime:
     type: from-field
     from: "ts_field"                 # required: original field name in the 
input
 
-# Converts the assigned timestamps from a DataStream API record into the 
rowtime attribute 
+# Converts the assigned timestamps from a DataStream API record into the 
rowtime attribute
 # and thus preserves the assigned timestamps from the source.
 rowtime:
   timestamps:
@@ -351,7 +351,7 @@ The following watermark strategies are supported:
 <div class="codetabs" markdown="1">
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
-// Sets a watermark strategy for ascending rowtime attributes. Emits a 
watermark of the maximum 
+// Sets a watermark strategy for ascending rowtime attributes. Emits a 
watermark of the maximum
 // observed timestamp so far minus 1. Rows that have a timestamp equal to the 
max timestamp
 // are not late.
 .rowtime(
@@ -377,7 +377,7 @@ The following watermark strategies are supported:
 
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
-# Sets a watermark strategy for ascending rowtime attributes. Emits a 
watermark of the maximum 
+# Sets a watermark strategy for ascending rowtime attributes. Emits a 
watermark of the maximum
 # observed timestamp so far minus 1. Rows that have a timestamp equal to the 
max timestamp
 # are not late.
 rowtime:
@@ -695,7 +695,7 @@ connector:
 
 **Key extraction:** Flink automatically extracts valid keys from a query. For 
example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key 
of the fields `a` and `b`. The Elasticsearch connector generates a document ID 
string for every row by concatenating all key fields in the order defined in 
the query using a key delimiter. A custom representation of null literals for 
key fields can be defined.
 
-<span class="label label-danger">Attention</span> A JSON format defines how to 
encode documents for the external system, therefore, it must be added as a 
[dependency](connect.html#formats). 
+<span class="label label-danger">Attention</span> A JSON format defines how to 
encode documents for the external system, therefore, it must be added as a 
[dependency](connect.html#formats).
 
 {% top %}
 
@@ -717,8 +717,8 @@ The CSV format allows to read and write comma-separated 
rows.
   new Csv()
     .field("field1", Types.STRING)    // required: ordered format fields
     .field("field2", Types.TIMESTAMP)
-    .fieldDelimiter(",")              // optional: string delimiter "," by 
default 
-    .lineDelimiter("\n")              // optional: string delimiter "\n" by 
default 
+    .fieldDelimiter(",")              // optional: string delimiter "," by 
default
+    .lineDelimiter("\n")              // optional: string delimiter "\n" by 
default
     .quoteCharacter('"')              // optional: single character for string 
values, empty by default
     .commentPrefix('#')               // optional: string to indicate 
comments, empty by default
     .ignoreFirstLine()                // optional: ignore the first line, by 
default it is not skipped
@@ -736,8 +736,8 @@ format:
       type: VARCHAR
     - name: field2
       type: TIMESTAMP
-  field-delimiter: ","       # optional: string delimiter "," by default 
-  line-delimiter: "\n"       # optional: string delimiter "\n" by default 
+  field-delimiter: ","       # optional: string delimiter "," by default
+  line-delimiter: "\n"       # optional: string delimiter "\n" by default
   quote-character: '"'       # optional: single character for string values, 
empty by default
   comment-prefix: '#'        # optional: string to indicate comments, empty by 
default
   ignore-first-line: false   # optional: boolean flag to ignore the first 
line, by default it is not skipped
@@ -992,7 +992,7 @@ These are the additional `TableSink`s which are provided 
with Flink:
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | 
**Description**
 | `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
 | `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC 
table.
-| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | 
Writes a Table to a Cassandra table. 
+| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | 
Writes a Table to a Cassandra table.
 
 ### OrcTableSource
 
@@ -1044,7 +1044,7 @@ val orcTableSource = OrcTableSource.builder()
 
 ### CsvTableSink
 
-The `CsvTableSink` emits a `Table` to one or more CSV files. 
+The `CsvTableSink` emits a `Table` to one or more CSV files.
 
 The sink only supports append-only streaming tables. It cannot be used to emit 
a `Table` that is continuously updated. See the [documentation on Table to 
Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) 
for details. When emitting a streaming table, rows are written at least once 
(if checkpointing is enabled) and the `CsvTableSink` does not split output 
files into bucket files but continuously writes to the same files.
 
@@ -1053,17 +1053,17 @@ The sink only supports append-only streaming tables. It 
cannot be used to emit a
 {% highlight java %}
 
 CsvTableSink sink = new CsvTableSink(
-    path,                  // output path 
+    path,                  // output path
     "|",                   // optional: delimit files by '|'
     1,                     // optional: write to a single file
     WriteMode.OVERWRITE);  // optional: override existing files
 
 tableEnv.registerTableSink(
   "csvOutputTable",
-  sink,
   // specify table schema
   new String[]{"f0", "f1"},
-  new TypeInformation[]{Types.STRING, Types.INT});
+  new TypeInformation[]{Types.STRING, Types.INT},
+  sink);
 
 Table table = ...
 table.insertInto("csvOutputTable");
@@ -1074,17 +1074,17 @@ table.insertInto("csvOutputTable");
 {% highlight scala %}
 
 val sink: CsvTableSink = new CsvTableSink(
-    path,                             // output path 
+    path,                             // output path
     fieldDelim = "|",                 // optional: delimit files by '|'
     numFiles = 1,                     // optional: write to a single file
     writeMode = WriteMode.OVERWRITE)  // optional: override existing files
 
 tableEnv.registerTableSink(
   "csvOutputTable",
-  sink,
   // specify table schema
   Array[String]("f0", "f1"),
-  Array[TypeInformation[_]](Types.STRING, Types.INT))
+  Array[TypeInformation[_]](Types.STRING, Types.INT),
+  sink)
 
 val table: Table = ???
 table.insertInto("csvOutputTable")
@@ -1113,10 +1113,10 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
 
 tableEnv.registerTableSink(
   "jdbcOutputTable",
-  sink,
   // specify table schema
   new String[]{"id"},
-  new TypeInformation[]{Types.INT});
+  new TypeInformation[]{Types.INT},
+  sink);
 
 Table table = ...
 table.insertInto("jdbcOutputTable");
@@ -1134,10 +1134,10 @@ val sink: JDBCAppendTableSink = 
JDBCAppendTableSink.builder()
 
 tableEnv.registerTableSink(
   "jdbcOutputTable",
-  sink,
   // specify table schema
   Array[String]("id"),
-  Array[TypeInformation[_]](Types.INT))
+  Array[TypeInformation[_]](Types.INT),
+  sink)
 
 val table: Table = ???
 table.insertInto("jdbcOutputTable")
@@ -1145,7 +1145,7 @@ table.insertInto("jdbcOutputTable")
 </div>
 </div>
 
-Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify 
the name of the JDBC driver, the JDBC URL, the query to be executed, and the 
field types of the JDBC table. 
+Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify 
the name of the JDBC driver, the JDBC URL, the query to be executed, and the 
field types of the JDBC table.
 
 {% top %}
 
@@ -1164,16 +1164,16 @@ To use the `CassandraAppendTableSink`, you have to add 
the Cassandra connector d
 ClusterBuilder builder = ... // configure Cassandra cluster connection
 
 CassandraAppendTableSink sink = new CassandraAppendTableSink(
-  builder, 
+  builder,
   // the query must match the schema of the table
   INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
 
 tableEnv.registerTableSink(
   "cassandraOutputTable",
-  sink,
   // specify table schema
   new String[]{"id", "name", "value"},
-  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE});
+  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
+  sink);
 
 Table table = ...
 table.insertInto(cassandraOutputTable);
@@ -1185,16 +1185,16 @@ table.insertInto(cassandraOutputTable);
 val builder: ClusterBuilder = ... // configure Cassandra cluster connection
 
 val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
-  builder, 
+  builder,
   // the query must match the schema of the table
   INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
 
 tableEnv.registerTableSink(
   "cassandraOutputTable",
-  sink,
   // specify table schema
   Array[String]("id", "name", "value"),
-  Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE))
+  Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
+  sink)
 
 val table: Table = ???
 table.insertInto(cassandraOutputTable)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to