This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 98c4c9899 [doc] Document Java API (#849)
98c4c9899 is described below

commit 98c4c98994587140aec3fae3594ed60174dc9e30
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 7 16:40:46 2023 +0800

    [doc] Document Java API (#849)
---
 .../configurations.md => api/_index.md}            |  36 +--
 docs/content/api/java-api.md                       | 319 +++++++++++++++++++++
 docs/content/maintenance/configurations.md         |   2 +-
 .../paimon/table/sink/BatchWriteBuilder.java       |   4 +-
 4 files changed, 327 insertions(+), 34 deletions(-)

diff --git a/docs/content/maintenance/configurations.md 
b/docs/content/api/_index.md
similarity index 61%
copy from docs/content/maintenance/configurations.md
copy to docs/content/api/_index.md
index fdc584386..0c720c87f 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/api/_index.md
@@ -1,9 +1,9 @@
 ---
-title: "Configurations"
-weight: 10
-type: docs
-aliases:
-- /development/configurations.html
+title: Api
+icon: <i class="fa fa-briefcase title maindish" aria-hidden="true"></i>
+bold: true
+bookCollapseSection: true
+weight: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -23,29 +23,3 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-
-# Configuration
-
-### CoreOptions
-
-Core options for paimon.
-
-{{< generated/core_configuration >}}
-
-### CatalogOptions
-
-Options for paimon catalog.
-
-{{< generated/catalog_configuration >}}
-
-### HiveCatalogOptions
-
-Options for Hive catalog.
-
-{{< generated/hive_catalog_configuration >}}
-
-### FlinkConnectorOptions
-
-Flink connector options for paimon.
-
-{{< generated/flink_connector_configuration >}}
diff --git a/docs/content/api/java-api.md b/docs/content/api/java-api.md
new file mode 100644
index 000000000..3b1e112ae
--- /dev/null
+++ b/docs/content/api/java-api.md
@@ -0,0 +1,319 @@
+---
+title: "Java API"
+weight: 1
+type: docs
+aliases:
+- /api/java-api.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Java API
+
+## Dependency
+
+Maven dependency:
+
+```xml
+<dependency>
+  <groupId>org.apache.paimon</groupId>
+  <artifactId>paimon-bundle</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+Or download the jar file:
+{{< stable >}}[Paimon 
Bundle](https://www.apache.org/dyn/closer.lua/flink/paimon-{{< version 
>}}/paimon-bundle-{{< version >}}.jar).{{< /stable >}}
+{{< unstable >}}[Paimon 
Bundle](https://repository.apache.org/snapshots/org/apache/paimon/paimon-bundle/{{<
 version >}}/).{{< /unstable >}}
+
+Paimon relies on Hadoop environment, you should add hadoop classpath or 
bundled jar.
+
+## Create Catalog
+
+Before coming into contact with the Table, you need to create a Catalog.
+
+```java
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+
+public class CreateCatalog {
+
+    public static void createFilesystemCatalog() {
+        CatalogContext context = CatalogContext.create(new Path("..."));
+        Catalog catalog = CatalogFactory.createCatalog(context);
+    }
+
+    public static void createHiveCatalog() {
+        // Paimon Hive catalog relies on Hive jars
+        // You should add hive classpath or hive bundled jar.
+        Options options = new Options();
+        options.set("warehouse", "...");
+        options.set("metastore", "hive");
+        options.set("uri", "...");
+        options.set("hive-conf-dir", "...");
+        CatalogContext context = CatalogContext.create(options);
+        Catalog catalog = CatalogFactory.createCatalog(context);
+    }
+}
+```
+
+## Create Table
+
+You can use the catalog to create tables. The created tables are persistence 
in the file system.
+Next time you can directly obtain these tables.
+
+```java
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+
+public class CreateTable {
+
+    public static void main(String[] args) {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.primaryKey("...");
+        schemaBuilder.partitionKeys("...");
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        Schema schema = schemaBuilder.build();
+
+        Identifier identifier = Identifier.create("my_db", "my_table");
+        try {
+            catalog.createTable(identifier, schema, false);
+        } catch (Catalog.TableAlreadyExistException e) {
+            // do something
+        } catch (Catalog.DatabaseNotExistException e) {
+            // do something
+        }
+    }
+}
+```
+
+## Get Table
+
+The `Table` interface provides access to the table metadata and tools to read 
and write table.
+
+```java
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+public class GetTable {
+
+    public static void main(String[] args) {
+        Identifier identifier = Identifier.create("my_db", "my_table");
+        try {
+            Table table = catalog.getTable(identifier);
+        } catch (Catalog.TableNotExistException e) {
+            // do something
+        }
+    }
+}
+```
+
+Table metadata:
+
+- `name` return a name string to identify this table.
+- `rowType` return the current row type of this table containing a sequence of 
table's fields.
+- `partitionKeys` returns the partition keys of this table.
+- `parimaryKeys` returns the primary keys of this table.
+- `options` returns the configuration of this table in a map of key-value.
+- `comment` returns the optional comment of this table.
+- `copy` return a new table by applying dynamic options to this table.
+
+## Batch Read
+
+For relatively small amounts of data, or for data that has undergone 
projection and filtering,
+you can directly use a standalone program to read the table data.
+
+But if the data volume of the table is relatively large, you can distribute 
splits to different tasks for reading.
+
+The reading is divided into two stages:
+
+1. Scan Plan: Generate plan splits in a global node ('Coordinator', or named 
'Driver').
+2. Read Split: Read split in distributed tasks.
+
+```java
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ReadTable {
+
+    public static void main(String[] args) {
+        // 1. Create a ReadBuilder and push filter (`withFilter`)
+        // and projection (`withProjection`) if necessary
+        ReadBuilder readBuilder = table.newReadBuilder()
+                .withProjection(projection)
+                .withFilter(filter);
+
+        // 2. Plan splits in 'Coordinator' (or named 'Driver')
+        List<Split> splits = readBuilder.newScan().plan().splits();
+
+        // 3. Distribute these splits to different tasks
+
+        // 4. Read a split in task
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+        reader.forEachRemaining(ReadTable::readRow);
+    }
+}
+```
+
+## Batch Write
+
+The writing is divided into two stages:
+
+1. Write records: Write records in distributed tasks, generate commit messages.
+2. Commit: Collect all CommitMessages, commit them in a global node 
('Coordinator', or named 'Driver', or named 'Committer').
+
+```java
+import java.util.List;
+
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+
+public class WriteTable {
+
+    public static void main(String[] args) {
+        // 1. Create a WriteBuilder (Serializable)
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder()
+                .withOverwrite(staticPartition);
+
+        // 2. Write records in distributed tasks
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(record1);
+        write.write(record2);
+        write.write(record3);
+        List<CommitMessage> messages = write.prepareCommit();
+
+        // 3. Collect all CommitMessages to a global node and commit
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+    }
+}
+```
+
+## Stream Read
+
+The difference of Stream Read is that StreamTableScan can continuously scan 
and generate splits.
+
+StreamTableScan provides the ability to checkpoint and restore, which can let 
you save the correct state
+during stream reading.
+
+```java
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+
+public class StreamReadTable {
+
+    public static void main(String[] args) throws IOException {
+        // 1. Create a ReadBuilder and push filter (`withFilter`)
+        // and projection (`withProjection`) if necessary
+        ReadBuilder readBuilder = table.newReadBuilder()
+                .withProjection(projection)
+                .withFilter(filter);
+
+        // 2. Plan splits in 'Coordinator' (or named 'Driver')
+        StreamTableScan scan = readBuilder.newStreamScan();
+        while (true) {
+            List<Split> splits = scan.plan().splits();
+            // Distribute these splits to different tasks
+
+            Long state = scan.checkpoint();
+            // can be restored in scan.restore(state) after failover
+        }
+
+        // 3. Read a split in task
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+        reader.forEachRemaining(row -> System.out.println(row));
+    }
+}
+```
+
+## Stream Write
+
+The difference of Stream Write is that StreamTableCommit can continuously 
commit.
+
+Key points to achieve exactly-once consistency:
+
+- CommitUser represents a user. A user can commit multiple times. In 
distributed processing, you are
+  expected to use the same commitUser.
+- Different applications need to use different commitUsers.
+- The commitIdentifier of `StreamTableWrite` and `StreamTableCommit` needs to 
be consistent, and the
+  id needs to be incremented for the next committing.
+- When a failure occurs, if you still have uncommitted `CommitMessage`s, 
please use `StreamTableCommit#filterCommitted`
+  to exclude the committed messages by commitIdentifier.
+
+```java
+import java.util.List;
+
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+
+public class StreamWriteTable {
+
+    public static void main(String[] args) throws Exception {
+        // 1. Create a WriteBuilder (Serializable)
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+
+        // 2. Write records in distributed tasks
+        StreamTableWrite write = writeBuilder.newWrite();
+        // commitIdentifier like Flink checkpointId
+        long commitIdentifier = 0;
+        while (true) {
+            write.write(record1);
+            write.write(record2);
+            write.write(record3);
+            List<CommitMessage> messages = write.prepareCommit(
+                    false, commitIdentifier);
+            commitIdentifier++;
+        }
+
+        // 3. Collect all CommitMessages to a global node and commit
+        StreamTableCommit commit = writeBuilder.newCommit();
+        commit.commit(commitIdentifier, messages);
+
+        // 4. When failover, you can use 'filterCommitted' to filter committed 
commits.
+        commit.filterCommitted(committedIdentifiers);
+    }
+}
+```
diff --git a/docs/content/maintenance/configurations.md 
b/docs/content/maintenance/configurations.md
index fdc584386..e34a33260 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -3,7 +3,7 @@ title: "Configurations"
 weight: 10
 type: docs
 aliases:
-- /development/configurations.html
+- /maintenance/configurations.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
index fbfcd250f..ee0091a7b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
@@ -37,14 +37,14 @@ import java.util.Map;
  * WriteBuilder builder = table.newWriteBuilder();
  *
  * // 2. Write records in distributed tasks
- * TableWrite write = builder.newWrite();
+ * BatchTableWrite write = builder.newWrite();
  * write.write(...);
  * write.write(...);
  * write.write(...);
  * List<CommitMessage> messages = write.prepareCommit();
  *
  * // 3. Collect all CommitMessages to a global node and commit
- * TableCommit commit = builder.newCommit();
+ * BatchTableCommit commit = builder.newCommit();
  * commit.commit(allCommitMessages());
  * }</pre>
  *

Reply via email to