This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f55e6a2cd [doc] Document Flink API (#935)
f55e6a2cd is described below

commit f55e6a2cd46b291a101a716de8e7f953ae4e7534
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 18 19:19:18 2023 +0800

    [doc] Document Flink API (#935)
---
 docs/content/api/flink-api.md | 140 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 140 insertions(+)

diff --git a/docs/content/api/flink-api.md b/docs/content/api/flink-api.md
new file mode 100644
index 000000000..4ee56c0ae
--- /dev/null
+++ b/docs/content/api/flink-api.md
@@ -0,0 +1,140 @@
+---
+title: "Flink API"
+weight: 2
+type: docs
+aliases:
+- /api/flink-api.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink API
+
+## Dependency
+
+Maven dependency:
+
+```xml
+<dependency>
+  <groupId>org.apache.paimon</groupId>
+  <artifactId>paimon-flink-1.17</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
+  <version>1.17.0</version>
+  <scope>provided</scope>
+</dependency>
+```
+
+Or download the jar file:
+{{< stable >}}[Paimon 
Flink](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.17/{{<
 version >}}/paimon-flink-1.17-{{< version >}}.jar).{{< /stable >}}
+{{< unstable >}}[Paimon 
Flink](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/{{<
 version >}}/).{{< /unstable >}}
+
+Please choose your Flink version.
+
+Paimon relies on Hadoop environment, you should add hadoop classpath or 
bundled jar.
+
+Paimon does not provide a DataStream API, but you can read or write to Paimon 
tables by the conversion between DataStream and Table in Flink.
+See [DataStream API 
Integration](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/).
+
+## Write to Table 
+
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+public class WriteToTable {
+
+    public static void writeTo() {
+        // create environments of both APIs
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+        // create a changelog DataStream
+        DataStream<Row> dataStream =
+                env.fromElements(
+                        Row.ofKind(RowKind.INSERT, "Alice", 12),
+                        Row.ofKind(RowKind.INSERT, "Bob", 5),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
+                        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
+
+        // interpret the DataStream as a Table
+        Schema schema = Schema.newBuilder()
+                .column("name", DataTypes.STRING())
+                .column("age", DataTypes.INT())
+                .build();
+        Table table = tableEnv.fromChangelogStream(dataStream, schema);
+
+        // create paimon catalog
+        tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 
'warehouse'='...')");
+        tableEnv.executeSql("USE paimon");
+
+        // register the table under a name and perform an aggregation
+        tableEnv.createTemporaryView("InputTable", table);
+
+        // insert into paimon table from your data stream table
+        tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM 
InputTable");
+    }
+}
+```
+
+## Read from Table
+
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+public class ReadFromTable {
+
+    public static void readFrom() throws Exception {
+        // create environments of both APIs
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+        // create paimon catalog
+        tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 
'warehouse'='...')");
+        tableEnv.executeSql("USE paimon");
+
+        // convert to DataStream
+        Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table");
+        DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
+
+        // use this datastream
+        dataStream.executeAndCollect().forEachRemaining(System.out::println);
+
+        // prints:
+        // +I[Bob, 12]
+        // +I[Alice, 12]
+        // -U[Alice, 12]
+        // +U[Alice, 14]
+    }
+}
+```

Reply via email to