This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9226783fe [doc] Update docs in benchmark and java api (#1957)
9226783fe is described below

commit 9226783fed4878a5edeb71e580fe6607d82e2c16
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Sep 7 11:23:34 2023 +0800

    [doc] Update docs in benchmark and java api (#1957)
---
 docs/content/api/java-api.md                       | 340 ++++++++++++---------
 docs/content/concepts/file-operations.md           |  11 +-
 docs/content/concepts/primary-key-table.md         |   4 +-
 .../paimon-cluster-benchmark/README.md             |  16 +-
 .../org/apache/paimon/benchmark/Benchmark.java     |  35 ++-
 .../src/main/resources/bin/config.sh               |   2 +-
 6 files changed, 247 insertions(+), 161 deletions(-)

diff --git a/docs/content/api/java-api.md b/docs/content/api/java-api.md
index ed72d6b01..7b4a16a49 100644
--- a/docs/content/api/java-api.md
+++ b/docs/content/api/java-api.md
@@ -57,12 +57,12 @@ import org.apache.paimon.options.Options;
 
 public class CreateCatalog {
 
-    public static void createFilesystemCatalog() {
+    public static Catalog createFilesystemCatalog() {
         CatalogContext context = CatalogContext.create(new Path("..."));
-        Catalog catalog = CatalogFactory.createCatalog(context);
+        return CatalogFactory.createCatalog(context);
     }
 
-    public static void createHiveCatalog() {
+    public static Catalog createHiveCatalog() {
         // Paimon Hive catalog relies on Hive jars
         // You should add hive classpath or hive bundled jar.
         Options options = new Options();
@@ -72,7 +72,7 @@ public class CreateCatalog {
         options.set("hive-conf-dir", "...");
         options.set("hadoop-conf-dir", "...");
         CatalogContext context = CatalogContext.create(options);
-        Catalog catalog = CatalogFactory.createCatalog(context);
+        return CatalogFactory.createCatalog(context);
     }
 }
 ```
@@ -83,17 +83,17 @@ You can use the catalog to create databases. The created 
databases are persisten
 
 ```java
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
 
 public class CreateDatabase {
 
-     public static void main(String[] args) {
-            try {
-                catalog.createDatabase("my_db", false);
-            } catch (Catalog.DatabaseAlreadyExistException e) {
-                // do something
-            }
+    public static void main(String[] args) {
+        try {
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
+            catalog.createDatabase("my_db", false);
+        } catch (Catalog.DatabaseAlreadyExistException e) {
+            // do something
         }
+    }
 }
 ```
 
@@ -103,13 +103,13 @@ You can use the catalog to determine whether the database 
exists
 
 ```java
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
 
 public class DatabaseExists {
 
-     public static void main(String[] args) {
-             boolean exists = catalog.databaseExists("my_db");
-        }
+    public static void main(String[] args) {
+        Catalog catalog = CreateCatalog.createFilesystemCatalog();
+        boolean exists = catalog.databaseExists("my_db");
+    }
 }
 ```
 
@@ -119,13 +119,15 @@ You can use the catalog to list databases.
 
 ```java
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
+
+import java.util.List;
 
 public class ListDatabases {
 
-     public static void main(String[] args) {
-            List<String> databases = catalog.listDatabases();
-        }
+    public static void main(String[] args) {
+        Catalog catalog = CreateCatalog.createFilesystemCatalog();
+        List<String> databases = catalog.listDatabases();
+    }
 }
 ```
 
@@ -135,21 +137,19 @@ You can use the catalog to drop databases.
 
 ```java
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.fs.Path;
 
 public class DropDatabase {
 
-     public static void main(String[] args) {
-            try {
-                catalog.dropDatabase("my_db", false, true);
-            } catch (Catalog.DatabaseNotEmptyException e) {
-                // do something
-            } catch (Catalog.DatabaseNotExistException e) {
-                // do something
-            }
+    public static void main(String[] args) {
+        try {
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
+            catalog.dropDatabase("my_db", false, true);
+        } catch (Catalog.DatabaseNotEmptyException e) {
+            // do something
+        } catch (Catalog.DatabaseNotExistException e) {
+            // do something
         }
+    }
 }
 ```
 
@@ -159,7 +159,6 @@ You can use the catalog to create tables. The created 
tables are persistence in
 Next time you can directly obtain these tables.
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.Schema;
@@ -169,14 +168,15 @@ public class CreateTable {
 
     public static void main(String[] args) {
         Schema.Builder schemaBuilder = Schema.newBuilder();
-        schemaBuilder.primaryKey("...");
-        schemaBuilder.partitionKeys("...");
-        schemaBuilder.column("f0", DataTypes.INT());
-        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.primaryKey("f0", "f1");
+        schemaBuilder.partitionKeys("f1");
+        schemaBuilder.column("f0", DataTypes.STRING());
+        schemaBuilder.column("f1", DataTypes.INT());
         Schema schema = schemaBuilder.build();
 
         Identifier identifier = Identifier.create("my_db", "my_table");
         try {
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
             catalog.createTable(identifier, schema, false);
         } catch (Catalog.TableAlreadyExistException e) {
             // do something
@@ -192,18 +192,20 @@ public class CreateTable {
 The `Table` interface provides access to the table metadata and tools to read 
and write table.
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
 
 public class GetTable {
 
-    public static void main(String[] args) {
+    public static Table getTable() {
         Identifier identifier = Identifier.create("my_db", "my_table");
         try {
-            Table table = catalog.getTable(identifier);
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
+            return catalog.getTable(identifier);
         } catch (Catalog.TableNotExistException e) {
             // do something
+            throw new RuntimeException("table not exist");
         }
     }
 }
@@ -214,14 +216,14 @@ public class GetTable {
 You can use the catalog to determine whether the table exists
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 
 public class TableExists {
 
- public static void main(String[] args) {
+    public static void main(String[] args) {
         Identifier identifier = Identifier.create("my_db", "my_table");
+        Catalog catalog = CreateCatalog.createFilesystemCatalog();
         boolean exists = catalog.tableExists(identifier);
     }
 }
@@ -232,14 +234,16 @@ public class TableExists {
 You can use the catalog to list tables.
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 
+import java.util.List;
+
 public class ListTables {
 
- public static void main(String[] args) {
+    public static void main(String[] args) {
         try {
-            catalog.listTables("my_db");
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
+            List<String> tables = catalog.listTables("my_db");
         } catch (Catalog.DatabaseNotExistException e) {
             // do something
         }
@@ -252,7 +256,6 @@ public class ListTables {
 You can use the catalog to drop table.
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 
@@ -261,6 +264,7 @@ public class DropTable {
     public static void main(String[] args) {
         Identifier identifier = Identifier.create("my_db", "my_table");
         try {
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
             catalog.dropTable(identifier, false);
         } catch (Catalog.TableNotExistException e) {
             // do something
@@ -274,7 +278,6 @@ public class DropTable {
 You can use the catalog to rename a table.
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 
@@ -284,6 +287,7 @@ public class RenameTable {
         Identifier fromTableIdentifier = Identifier.create("my_db", 
"my_table");
         Identifier toTableIdentifier = Identifier.create("my_db", 
"test_table");
         try {
+            Catalog catalog = CreateCatalog.createFilesystemCatalog();
             catalog.renameTable(fromTableIdentifier, toTableIdentifier, false);
         } catch (Catalog.TableAlreadyExistException e) {
             // do something
@@ -305,48 +309,61 @@ You can use the catalog to alter a table, but you need to 
pay attention to the f
 - Update column to nested row type is not supported.
 
 ```java
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
+
 import com.google.common.collect.Lists;
+
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 public class AlterTable {
 
     public static void main(String[] args) {
         Identifier identifier = Identifier.create("my_db", "my_table");
-       
-        Map<String,String> options = new HashMap<>();
+
+        Map<String, String> options = new HashMap<>();
         options.put("bucket", "4");
         options.put("compaction.max.file-num", "40");
-       
-        catalog.createTable(
-            identifier,
-                  new Schema(
-                      Lists.newArrayList(
-                          new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
-                          new DataField(1, "col2", DataTypes.STRING(), 
"field2"),
-                          new DataField(2, "col3", DataTypes.STRING(), 
"field3"),
-                          new DataField(3, "col4", DataTypes.BIGINT(), 
"field4"),
-                          new DataField(
-                              4,
-                              "col5",
-                              DataTypes.ROW(
-                                  new DataField(5, "f1", DataTypes.STRING(), 
"f1"),
-                                  new DataField(6, "f2", DataTypes.STRING(), 
"f2"),
-                                  new DataField(7, "f3", DataTypes.STRING(), 
"f3")),
-                              "field5"),
-                          new DataField(8, "col6", DataTypes.STRING(), 
"field6")),
-                      Lists.newArrayList("col1"), // partition keys
-                      Lists.newArrayList("col1", "col2"),  //primary key
-                      options,
-                      "table comment"),
-                  false);
-       
+
+        Catalog catalog = CreateCatalog.createFilesystemCatalog();
+        try {
+            catalog.createTable(
+                identifier,
+                new Schema(
+                    Lists.newArrayList(
+                        new DataField(0, "col1", DataTypes.STRING(), "field1"),
+                        new DataField(1, "col2", DataTypes.STRING(), "field2"),
+                        new DataField(2, "col3", DataTypes.STRING(), "field3"),
+                        new DataField(3, "col4", DataTypes.BIGINT(), "field4"),
+                        new DataField(
+                            4,
+                            "col5",
+                            DataTypes.ROW(
+                                new DataField(
+                                    5, "f1", DataTypes.STRING(), "f1"),
+                                new DataField(
+                                    6, "f2", DataTypes.STRING(), "f2"),
+                                new DataField(
+                                    7, "f3", DataTypes.STRING(), "f3")),
+                            "field5"),
+                        new DataField(8, "col6", DataTypes.STRING(), 
"field6")),
+                    Lists.newArrayList("col1"), // partition keys
+                    Lists.newArrayList("col1", "col2"), // primary key
+                    options,
+                    "table comment"),
+                false);
+        } catch (Catalog.TableAlreadyExistException e) {
+            // do something
+        } catch (Catalog.DatabaseNotExistException e) {
+            // do something
+        }
+
         // add option
         SchemaChange addOption = 
SchemaChange.setOption("snapshot.time-retained", "2h");
         // remove option
@@ -355,40 +372,54 @@ public class AlterTable {
         SchemaChange addColumn = SchemaChange.addColumn("col1_after", 
DataTypes.STRING());
         // add a column after col1
         SchemaChange.Move after = SchemaChange.Move.after("col1_after", 
"col1");
-        SchemaChange addColumnAfterField = 
+        SchemaChange addColumnAfterField =
             SchemaChange.addColumn("col7", DataTypes.STRING(), "", after);
         // rename column
         SchemaChange renameColumn = SchemaChange.renameColumn("col3", 
"col3_new_name");
         // drop column
         SchemaChange dropColumn = SchemaChange.dropColumn("col6");
         // update column comment
-        SchemaChange updateColumnComment = 
SchemaChange.updateColumnComment(new String[]{"col4"}, "col4 field");
+        SchemaChange updateColumnComment =
+            SchemaChange.updateColumnComment(new String[] {"col4"}, "col4 
field");
         // update nested column comment
         SchemaChange updateNestedColumnComment =
-        SchemaChange.updateColumnComment(new String[]{"col5", "f1"}, "col5 f1 
field");
+            SchemaChange.updateColumnComment(new String[] {"col5", "f1"}, 
"col5 f1 field");
         // update column type
         SchemaChange updateColumnType = SchemaChange.updateColumnType("col4", 
DataTypes.DOUBLE());
         // update column position, you need to pass in a parameter of type Move
-        SchemaChange updateColumnPosition = 
SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4"));
+        SchemaChange updateColumnPosition =
+            SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4"));
         // update column nullability
-        SchemaChange updateColumnNullability = 
SchemaChange.updateColumnNullability(new String[]{"col4"}, false);
+        SchemaChange updateColumnNullability =
+            SchemaChange.updateColumnNullability(new String[] {"col4"}, false);
         // update nested column nullability
-        SchemaChange updateNestedColumnNullability = 
-            SchemaChange.updateColumnNullability(new String[]{"col5", "f2"}, 
false);
-
-        SchemaChange[] schemaChanges = new SchemaChange[] {
-            addOption, removeOption, addColumn, addColumnAfterField, 
renameColumn, dropColumn,
-            updateColumnComment, updateNestedColumnComment, updateColumnType, 
updateColumnPosition,
-            updateColumnNullability, updateNestedColumnNullability};
+        SchemaChange updateNestedColumnNullability =
+            SchemaChange.updateColumnNullability(new String[] {"col5", "f2"}, 
false);
+
+        SchemaChange[] schemaChanges =
+            new SchemaChange[] {
+                addOption,
+                removeOption,
+                addColumn,
+                addColumnAfterField,
+                renameColumn,
+                dropColumn,
+                updateColumnComment,
+                updateNestedColumnComment,
+                updateColumnType,
+                updateColumnPosition,
+                updateColumnNullability,
+                updateNestedColumnNullability
+            };
         try {
             catalog.alterTable(identifier, Arrays.asList(schemaChanges), 
false);
         } catch (Catalog.TableNotExistException e) {
             // do something
-        } catch (Catalog.TableAlreadyExistException e) {
+        } catch (Catalog.ColumnAlreadyExistException e) {
             // do something
-        } catch (Catalog.DatabaseNotExistException e) {
+        } catch (Catalog.ColumnNotExistException e) {
             // do something
-        } 
+        }
     }
 }
 ```
@@ -417,30 +448,36 @@ The reading is divided into two stages:
 
 ```java
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
 import com.google.common.collect.Lists;
 
-import java.io.IOException;
 import java.util.List;
 
 public class ReadTable {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         // 1. Create a ReadBuilder and push filter (`withFilter`)
         // and projection (`withProjection`) if necessary
-        // [{"Alice", 12},{"Bob", 5},{"Emily", 18}]
-         PredicateBuilder builder = 
-            new 
PredicateBuilder(RowType.of(DataTypes.STRING(),DataTypes.INT()));
-         Predicate notNull = builder.isNotNull(0);
-         Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
+        Table table = GetTable.getTable();
 
-        ReadBuilder readBuilder = table.newReadBuilder()
+        PredicateBuilder builder =
+            new PredicateBuilder(RowType.of(DataTypes.STRING(), 
DataTypes.INT()));
+        Predicate notNull = builder.isNotNull(0);
+        Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
+
+        int[] projection = new int[] {0, 1};
+
+        ReadBuilder readBuilder =
+            table.newReadBuilder()
                 .withProjection(projection)
                 .withFilter(Lists.newArrayList(notNull, greaterOrEqual));
 
@@ -452,7 +489,7 @@ public class ReadTable {
         // 4. Read a split in task
         TableRead read = readBuilder.newRead();
         RecordReader<InternalRow> reader = read.createReader(splits);
-        reader.forEachRemaining(ReadTable::readRow);
+        reader.forEachRemaining(System.out::println);
     }
 }
 ```
@@ -466,21 +503,21 @@ The writing is divided into two stages:
    When the commit fails for certain reason, abort unsuccessful commit via 
CommitMessages. 
 
 ```java
-import java.util.List;
-
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
 
-public class WriteTable {
+import java.util.List;
 
-    public static void main(String[] args) {
+public class BatchWrite {
+    public static void main(String[] args) throws Exception {
         // 1. Create a WriteBuilder (Serializable)
-        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder()
-                .withOverwrite(staticPartition);
+        Table table = GetTable.getTable();
+        BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder().withOverwrite();
 
         // 2. Write records in distributed tasks
         BatchTableWrite write = writeBuilder.newWrite();
@@ -498,7 +535,7 @@ public class WriteTable {
         // 3. Collect all CommitMessages to a global node and commit
         BatchTableCommit commit = writeBuilder.newCommit();
         commit.commit(messages);
-        
+
         // Abort unsuccessful commit to delete data files
         // commit.abort(messages);
     }
@@ -513,24 +550,40 @@ StreamTableScan provides the ability to checkpoint and 
restore, which can let yo
 during stream reading.
 
 ```java
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
 
 public class StreamReadTable {
 
-    public static void main(String[] args) throws IOException {
+    public static void main(String[] args) throws Exception {
         // 1. Create a ReadBuilder and push filter (`withFilter`)
         // and projection (`withProjection`) if necessary
-        ReadBuilder readBuilder = table.newReadBuilder()
+        Table table = GetTable.getTable();
+
+        PredicateBuilder builder =
+            new PredicateBuilder(RowType.of(DataTypes.STRING(), 
DataTypes.INT()));
+        Predicate notNull = builder.isNotNull(0);
+        Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
+
+        int[] projection = new int[] {0, 1};
+
+        ReadBuilder readBuilder =
+            table.newReadBuilder()
                 .withProjection(projection)
-                .withFilter(filter);
+                .withFilter(Lists.newArrayList(notNull, greaterOrEqual));
 
         // 2. Plan splits in 'Coordinator' (or named 'Driver')
         StreamTableScan scan = readBuilder.newStreamScan();
@@ -539,13 +592,15 @@ public class StreamReadTable {
             // Distribute these splits to different tasks
 
             Long state = scan.checkpoint();
-            // can be restored in scan.restore(state) after failover
-        }
+            // can be restored in scan.restore(state) after fail over
 
-        // 3. Read a split in task
-        TableRead read = readBuilder.newRead();
-        RecordReader<InternalRow> reader = read.createReader(splits);
-        reader.forEachRemaining(row -> System.out.println(row));
+            // 3. Read a split in task
+            TableRead read = readBuilder.newRead();
+            RecordReader<InternalRow> reader = read.createReader(splits);
+            reader.forEachRemaining(System.out::println);
+
+            Thread.sleep(1000);
+        }
     }
 }
 ```
@@ -565,44 +620,53 @@ Key points to achieve exactly-once consistency:
   to exclude the committed messages by commitIdentifier.
 
 ```java
-import java.util.List;
-
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 
+import java.util.List;
+
 public class StreamWriteTable {
 
     public static void main(String[] args) throws Exception {
         // 1. Create a WriteBuilder (Serializable)
+        Table table = GetTable.getTable();
         StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
 
         // 2. Write records in distributed tasks
         StreamTableWrite write = writeBuilder.newWrite();
         // commitIdentifier like Flink checkpointId
         long commitIdentifier = 0;
+
         while (true) {
+            GenericRow record1 = 
GenericRow.of(BinaryString.fromString("Alice"), 12);
+            GenericRow record2 = GenericRow.of(BinaryString.fromString("Bob"), 
5);
+            GenericRow record3 = 
GenericRow.of(BinaryString.fromString("Emily"), 18);
             write.write(record1);
             write.write(record2);
             write.write(record3);
-            List<CommitMessage> messages = write.prepareCommit(
-                    false, commitIdentifier);
+            List<CommitMessage> messages = write.prepareCommit(false, 
commitIdentifier);
             commitIdentifier++;
-        }
 
-        // 3. Collect all CommitMessages to a global node and commit
-        StreamTableCommit commit = writeBuilder.newCommit();
-        commit.commit(commitIdentifier, messages);
-
-        // 4. When failure occurs and you're not sure if the commit process is 
successful,
-        //    you can use `filterAndCommit` to retry the commit process.
-        //    Succeeded commits will be automatically skipped.
-        /*
-        Map<Long, List<CommitMessage>> commitIdentifiersAndMessages = new 
HashMap<>();
-        commitIdentifiersAndMessages.put(commitIdentifier, messages);
-        commit.filterAndCommit(commitIdentifiersAndMessages);
-        */
+            // 3. Collect all CommitMessages to a global node and commit
+            StreamTableCommit commit = writeBuilder.newCommit();
+            commit.commit(commitIdentifier, messages);
+
+            // 4. When failure occurs and you're not sure if the commit 
process is successful,
+            //    you can use `filterAndCommit` to retry the commit process.
+            //    Succeeded commits will be automatically skipped.
+            /*
+            Map<Long, List<CommitMessage>> commitIdentifiersAndMessages = new 
HashMap<>();
+            commitIdentifiersAndMessages.put(commitIdentifier, messages);
+            commit.filterAndCommit(commitIdentifiersAndMessages);
+            */
+
+            Thread.sleep(1000);
+        }
     }
 }
 ```
@@ -642,5 +706,3 @@ public class StreamWriteTable {
 | <=           | org.apache.paimon.predicate.PredicateBuilder.LessOrEqual     |
 | >            | org.apache.paimon.predicate.PredicateBuilder.GreaterThan     |
 | >=           | org.apache.paimon.predicate.PredicateBuilder.GreaterOrEqual  |
-
-
diff --git a/docs/content/concepts/file-operations.md 
b/docs/content/concepts/file-operations.md
index 58696d836..afcd7515b 100644
--- a/docs/content/concepts/file-operations.md
+++ b/docs/content/concepts/file-operations.md
@@ -155,7 +155,7 @@ The second `commit` takes place and executing `SELECT * 
FROM T` will return
 10 rows. A new snapshot, namely `snapshot-2`, is created and gives us the 
 following physical file layout:
 ```bash
- % ls -atR . 
+ % ls -1tR . 
 ./T:
 dt=20230501
 dt=20230502    
@@ -178,12 +178,12 @@ EARLIEST
 snapshot-1
 
 ./T/manifest:
-manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1        # delta manifest list 
for snapshot-2
-manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0  # base manifest list for 
snapshot-2  
+manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for 
snapshot-2
+manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for 
snapshot-2   
 manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0         # manifest file for 
snapshot-2
 
-manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1        # delta manifest list 
for snapshot-1 
-manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0  # base manifest list for 
snapshot-1
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for 
snapshot-1 
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for 
snapshot-1
 manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0  # manifest file for snapshot-1
 
 ./T/dt=20230501/bucket-0:
@@ -245,6 +245,7 @@ Let's trigger the full-compaction now, and run a dedicated 
compaction job throug
 {{< label Batch >}}
 ```bash  
 <FLINK_HOME>/bin/flink run \
+    -D execution.runtime-mode=batch \
     /path/to/paimon-flink-action-{{< version >}}.jar \
     compact \
     --warehouse <warehouse-path> \
diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 06153616f..cb47455e9 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -269,11 +269,11 @@ This is of great help in replacing log deduplication in 
streaming computation.
 
 Streaming queries will continuously produce the latest changes. These changes 
can come from the underlying table files or from an [external log system]({{< 
ref "concepts/external-log-systems" >}}) like Kafka. Compared to the external 
log system, changes from table files have lower cost but higher latency 
(depending on how often snapshots are created).
 
-By specifying the `changelog-producer` table property when creating the table, 
users can choose the pattern of changes produced from files.
+By specifying the `changelog-producer` table property when creating the table, 
users can choose the pattern of changes produced from table files.
 
 {{< hint info >}}
 
-The `changelog-producer` table property only affects changelog from files. It 
does not affect the external log system.
+The `changelog-producer` table property only affects changelog from table 
files. It does not affect the external log system.
 
 {{< /hint >}}
 
diff --git a/paimon-benchmark/paimon-cluster-benchmark/README.md 
b/paimon-benchmark/paimon-cluster-benchmark/README.md
index 2907639c1..c67a5597d 100644
--- a/paimon-benchmark/paimon-cluster-benchmark/README.md
+++ b/paimon-benchmark/paimon-cluster-benchmark/README.md
@@ -7,7 +7,7 @@ This is the cluster benchmark module for Paimon. Inspired by 
[Nexmark](https://g
 * This benchmark only runs on Linux. You'll need a Linux environment 
(preferably an EMR cluster). For a more reasonable result, we recommend this 
cluster to have:
   * One master node with 8 cores and 16GB RAM.
   * Two worker nodes with 16 cores and 64GB RAM.
-* This benchmark runs on a standalone Flink cluster. Download Flink >= 1.15 
from the [Apache Flink's 
website](https://flink.apache.org/downloads.html#apache-flink-1160) and setup a 
standalone cluster. Flink's job manager must be on the master node of your EMR 
cluster. We recommend the following Flink configurations:
+* This benchmark runs on a standalone Flink cluster. Download Flink >= 1.15 
from the [Apache Flink's 
website](https://flink.apache.org/downloads.html#apache-flink-1160) and set up 
a standalone cluster. Flink's job manager must be on the master node of your 
EMR cluster. We recommend the following Flink configurations:
     ```yaml
     jobmanager.memory.process.size: 4096m
     taskmanager.memory.process.size: 4096m
@@ -21,13 +21,13 @@ This is the cluster benchmark module for Paimon. Inspired 
by [Nexmark](https://g
 * This benchmark needs the `FLINK_HOME` environment variable. Set `FLINK_HOME` 
to your Flink directory.
 
 ### Setup Benchmark
-* Build this module with command `mvn clean package`.
-* Copy `target/paimon-benchmark-bin/paimon-benchmark` to the master node of 
your EMR cluster.
-* Modify `paimon-benchmark/conf/benchmark.yaml` on the master node. You must 
change these config options:
-  * `benchmark.metric.reporter.host` and `flink.rest.address`: set these to 
the address of master node of your EMR cluster.
-  *  `benchmark.sink.path` is the path to which queries insert records. This 
should point to a non-existing path. Contents of this path will be removed 
before each test.
-* Copy `paimon-benchmark` to every worker node of your EMR cluster.
-* Run `paimon-benchmark/bin/setup_cluster.sh` in master node. This activates 
the CPU metrics collector in worker nodes. Note that if you restart your Flink 
cluster, you must also restart the CPU metrics collectors. To stop CPU metrics 
collectors, run `paimon-benchmark/bin/shutdown_cluster.sh` in master node.
+1. Build this module with command `mvn clean package -DskipTests -pl 
paimon-benchmark/paimon-cluster-benchmark/ -am`.
+2. Copy 
`paimon-benchmark/paimon-cluster-benchmark/target/paimon-benchmark-bin/paimon-benchmark`
 to the master node of your EMR cluster.
+3. Modify `paimon-benchmark/conf/benchmark.yaml` on the master node. You must 
change these config options:
+   * `benchmark.metric.reporter.host` and `flink.rest.address`: set these to 
the address of master node of your EMR cluster.
+   * `benchmark.sink.path` is the path to which queries insert records. This 
should point to a non-existing path. Contents of this path will be removed 
before each test.
+4. Copy `paimon-benchmark` to every worker node of your EMR cluster.
+5. Run `paimon-benchmark/bin/setup_cluster.sh` in master node. This activates 
the CPU metrics collector in worker nodes. Note that if you restart your Flink 
cluster, you must also restart the CPU metrics collectors. To stop CPU metrics 
collectors, run `paimon-benchmark/bin/shutdown_cluster.sh` in master node.
 
 ### Run Benchmark
 * Run `paimon-benchmark/bin/run_benchmark.sh <query> <sink>` to run `<query>` 
for `<sink>`. Currently `<query>` can be `q1` or `all`, and sink can only be 
`paimon`.
diff --git 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
index c5f137a9f..e3e2b4027 100644
--- 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
+++ 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
@@ -33,9 +33,11 @@ import org.apache.flink.configuration.Configuration;
 import java.io.File;
 import java.nio.file.Path;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /** The entry point to run benchmark. */
@@ -59,7 +61,7 @@ public class Benchmark {
     public static void main(String[] args) throws Exception {
         if (args.length != 6) {
             throw new RuntimeException(
-                    "Usage: --location /path/to/benchmark --queries q1,q3 
--sinks paimon,hudi_merge_on_read");
+                    "Usage: --location /path/to/benchmark --queries q1 --sinks 
paimon");
         }
 
         Options options = getOptions();
@@ -71,20 +73,41 @@ public class Benchmark {
         String queriesValue = line.getOptionValue(QUERIES.getOpt());
         List<Query> queries = Query.load(location);
         if (!"all".equalsIgnoreCase(queriesValue)) {
-            List<String> wantedQueries =
+            Set<String> wantedQueries =
                     Arrays.stream(queriesValue.split(","))
                             .map(String::trim)
-                            .collect(Collectors.toList());
+                            .collect(Collectors.toSet());
+            Set<String> supportedQueries =
+                    
queries.stream().map(Query::name).collect(Collectors.toSet());
+            Set<String> unSupportedQueries = new HashSet<>(wantedQueries);
+            unSupportedQueries.removeAll(supportedQueries);
+            if (!unSupportedQueries.isEmpty()) {
+                throw new RuntimeException(
+                        "Unsupported queries: "
+                                + unSupportedQueries
+                                + ", current supported queries are "
+                                + supportedQueries);
+            }
             queries.removeIf(q -> !wantedQueries.contains(q.name()));
         }
 
         String sinksValue = line.getOptionValue(SINKS.getOpt());
         List<Sink> sinks = Sink.load(location);
         if (!"all".equalsIgnoreCase(sinksValue)) {
-            List<String> wantedSinks =
+            Set<String> wantedSinks =
                     Arrays.stream(sinksValue.split(","))
                             .map(String::trim)
-                            .collect(Collectors.toList());
+                            .collect(Collectors.toSet());
+            Set<String> supportedSinks = 
sinks.stream().map(Sink::name).collect(Collectors.toSet());
+            Set<String> unSupportedSinks = new HashSet<>(wantedSinks);
+            unSupportedSinks.removeAll(supportedSinks);
+            if (!unSupportedSinks.isEmpty()) {
+                throw new RuntimeException(
+                        "Unsupported sinks: "
+                                + unSupportedSinks
+                                + ", current supported sinks are "
+                                + supportedSinks);
+            }
             sinks.removeIf(s -> !wantedSinks.contains(s.name()));
         }
 
@@ -212,6 +235,6 @@ public class Benchmark {
             }
         }
         builder.append(separator);
-        System.err.println(builder.toString());
+        System.err.println(builder);
     }
 }
diff --git 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
index 65dc0c7b1..4fe71a0c0 100644
--- a/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
+++ b/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
@@ -47,7 +47,7 @@ export BENCHMARK_LOG_DIR
 export BENCHMARK_CONF_DIR
 export BENCHMARK_BIN_DIR
 
-# Auxilliary function which extracts the name of host from a line which
+# Auxiliary function which extracts the name of host from a line which
 # also potentially includes topology information and the taskManager type
 extractHostName() {
     # handle comments: extract first part of string (before first # character)


Reply via email to