This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9226783fe [doc] Update docs in benchmark and java api (#1957)
9226783fe is described below
commit 9226783fed4878a5edeb71e580fe6607d82e2c16
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Sep 7 11:23:34 2023 +0800
[doc] Update docs in benchmark and java api (#1957)
---
docs/content/api/java-api.md | 340 ++++++++++++---------
docs/content/concepts/file-operations.md | 11 +-
docs/content/concepts/primary-key-table.md | 4 +-
.../paimon-cluster-benchmark/README.md | 16 +-
.../org/apache/paimon/benchmark/Benchmark.java | 35 ++-
.../src/main/resources/bin/config.sh | 2 +-
6 files changed, 247 insertions(+), 161 deletions(-)
diff --git a/docs/content/api/java-api.md b/docs/content/api/java-api.md
index ed72d6b01..7b4a16a49 100644
--- a/docs/content/api/java-api.md
+++ b/docs/content/api/java-api.md
@@ -57,12 +57,12 @@ import org.apache.paimon.options.Options;
public class CreateCatalog {
- public static void createFilesystemCatalog() {
+ public static Catalog createFilesystemCatalog() {
CatalogContext context = CatalogContext.create(new Path("..."));
- Catalog catalog = CatalogFactory.createCatalog(context);
+ return CatalogFactory.createCatalog(context);
}
- public static void createHiveCatalog() {
+ public static Catalog createHiveCatalog() {
// Paimon Hive catalog relies on Hive jars
// You should add hive classpath or hive bundled jar.
Options options = new Options();
@@ -72,7 +72,7 @@ public class CreateCatalog {
options.set("hive-conf-dir", "...");
options.set("hadoop-conf-dir", "...");
CatalogContext context = CatalogContext.create(options);
- Catalog catalog = CatalogFactory.createCatalog(context);
+ return CatalogFactory.createCatalog(context);
}
}
```
@@ -83,17 +83,17 @@ You can use the catalog to create databases. The created
databases are persisten
```java
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
public class CreateDatabase {
- public static void main(String[] args) {
- try {
- catalog.createDatabase("my_db", false);
- } catch (Catalog.DatabaseAlreadyExistException e) {
- // do something
- }
+ public static void main(String[] args) {
+ try {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ catalog.createDatabase("my_db", false);
+ } catch (Catalog.DatabaseAlreadyExistException e) {
+ // do something
}
+ }
}
```
@@ -103,13 +103,13 @@ You can use the catalog to determine whether the database
exists
```java
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
public class DatabaseExists {
- public static void main(String[] args) {
- boolean exists = catalog.databaseExists("my_db");
- }
+ public static void main(String[] args) {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ boolean exists = catalog.databaseExists("my_db");
+ }
}
```
@@ -119,13 +119,15 @@ You can use the catalog to list databases.
```java
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
+
+import java.util.List;
public class ListDatabases {
- public static void main(String[] args) {
- List<String> databases = catalog.listDatabases();
- }
+ public static void main(String[] args) {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ List<String> databases = catalog.listDatabases();
+ }
}
```
@@ -135,21 +137,19 @@ You can use the catalog to drop databases.
```java
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.fs.Path;
public class DropDatabase {
- public static void main(String[] args) {
- try {
- catalog.dropDatabase("my_db", false, true);
- } catch (Catalog.DatabaseNotEmptyException e) {
- // do something
- } catch (Catalog.DatabaseNotExistException e) {
- // do something
- }
+ public static void main(String[] args) {
+ try {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ catalog.dropDatabase("my_db", false, true);
+ } catch (Catalog.DatabaseNotEmptyException e) {
+ // do something
+ } catch (Catalog.DatabaseNotExistException e) {
+ // do something
}
+ }
}
```
@@ -159,7 +159,6 @@ You can use the catalog to create tables. The created
tables are persistence in
Next time you can directly obtain these tables.
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
@@ -169,14 +168,15 @@ public class CreateTable {
public static void main(String[] args) {
Schema.Builder schemaBuilder = Schema.newBuilder();
- schemaBuilder.primaryKey("...");
- schemaBuilder.partitionKeys("...");
- schemaBuilder.column("f0", DataTypes.INT());
- schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.primaryKey("f0", "f1");
+ schemaBuilder.partitionKeys("f1");
+ schemaBuilder.column("f0", DataTypes.STRING());
+ schemaBuilder.column("f1", DataTypes.INT());
Schema schema = schemaBuilder.build();
Identifier identifier = Identifier.create("my_db", "my_table");
try {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
catalog.createTable(identifier, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
// do something
@@ -192,18 +192,20 @@ public class CreateTable {
The `Table` interface provides access to the table metadata and tools to read
and write table.
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
public class GetTable {
- public static void main(String[] args) {
+ public static Table getTable() {
Identifier identifier = Identifier.create("my_db", "my_table");
try {
- Table table = catalog.getTable(identifier);
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ return catalog.getTable(identifier);
} catch (Catalog.TableNotExistException e) {
// do something
+ throw new RuntimeException("table not exist");
}
}
}
@@ -214,14 +216,14 @@ public class GetTable {
You can use the catalog to determine whether the table exists
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
public class TableExists {
- public static void main(String[] args) {
+ public static void main(String[] args) {
Identifier identifier = Identifier.create("my_db", "my_table");
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
boolean exists = catalog.tableExists(identifier);
}
}
@@ -232,14 +234,16 @@ public class TableExists {
You can use the catalog to list tables.
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
+import java.util.List;
+
public class ListTables {
- public static void main(String[] args) {
+ public static void main(String[] args) {
try {
- catalog.listTables("my_db");
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ List<String> tables = catalog.listTables("my_db");
} catch (Catalog.DatabaseNotExistException e) {
// do something
}
@@ -252,7 +256,6 @@ public class ListTables {
You can use the catalog to drop table.
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
@@ -261,6 +264,7 @@ public class DropTable {
public static void main(String[] args) {
Identifier identifier = Identifier.create("my_db", "my_table");
try {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
catalog.dropTable(identifier, false);
} catch (Catalog.TableNotExistException e) {
// do something
@@ -274,7 +278,6 @@ public class DropTable {
You can use the catalog to rename a table.
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
@@ -284,6 +287,7 @@ public class RenameTable {
Identifier fromTableIdentifier = Identifier.create("my_db",
"my_table");
Identifier toTableIdentifier = Identifier.create("my_db",
"test_table");
try {
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
catalog.renameTable(fromTableIdentifier, toTableIdentifier, false);
} catch (Catalog.TableAlreadyExistException e) {
// do something
@@ -305,48 +309,61 @@ You can use the catalog to alter a table, but you need to
pay attention to the f
- Update column to nested row type is not supported.
```java
-import org.apache.paimon.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
+
import com.google.common.collect.Lists;
+
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
public class AlterTable {
public static void main(String[] args) {
Identifier identifier = Identifier.create("my_db", "my_table");
-
- Map<String,String> options = new HashMap<>();
+
+ Map<String, String> options = new HashMap<>();
options.put("bucket", "4");
options.put("compaction.max.file-num", "40");
-
- catalog.createTable(
- identifier,
- new Schema(
- Lists.newArrayList(
- new DataField(0, "col1", DataTypes.STRING(),
"field1"),
- new DataField(1, "col2", DataTypes.STRING(),
"field2"),
- new DataField(2, "col3", DataTypes.STRING(),
"field3"),
- new DataField(3, "col4", DataTypes.BIGINT(),
"field4"),
- new DataField(
- 4,
- "col5",
- DataTypes.ROW(
- new DataField(5, "f1", DataTypes.STRING(),
"f1"),
- new DataField(6, "f2", DataTypes.STRING(),
"f2"),
- new DataField(7, "f3", DataTypes.STRING(),
"f3")),
- "field5"),
- new DataField(8, "col6", DataTypes.STRING(),
"field6")),
- Lists.newArrayList("col1"), // partition keys
- Lists.newArrayList("col1", "col2"), //primary key
- options,
- "table comment"),
- false);
-
+
+ Catalog catalog = CreateCatalog.createFilesystemCatalog();
+ try {
+ catalog.createTable(
+ identifier,
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "col1", DataTypes.STRING(), "field1"),
+ new DataField(1, "col2", DataTypes.STRING(), "field2"),
+ new DataField(2, "col3", DataTypes.STRING(), "field3"),
+ new DataField(3, "col4", DataTypes.BIGINT(), "field4"),
+ new DataField(
+ 4,
+ "col5",
+ DataTypes.ROW(
+ new DataField(
+ 5, "f1", DataTypes.STRING(), "f1"),
+ new DataField(
+ 6, "f2", DataTypes.STRING(), "f2"),
+ new DataField(
+ 7, "f3", DataTypes.STRING(), "f3")),
+ "field5"),
+ new DataField(8, "col6", DataTypes.STRING(),
"field6")),
+ Lists.newArrayList("col1"), // partition keys
+ Lists.newArrayList("col1", "col2"), // primary key
+ options,
+ "table comment"),
+ false);
+ } catch (Catalog.TableAlreadyExistException e) {
+ // do something
+ } catch (Catalog.DatabaseNotExistException e) {
+ // do something
+ }
+
// add option
SchemaChange addOption =
SchemaChange.setOption("snapshot.time-retained", "2h");
// remove option
@@ -355,40 +372,54 @@ public class AlterTable {
SchemaChange addColumn = SchemaChange.addColumn("col1_after",
DataTypes.STRING());
// add a column after col1
SchemaChange.Move after = SchemaChange.Move.after("col1_after",
"col1");
- SchemaChange addColumnAfterField =
+ SchemaChange addColumnAfterField =
SchemaChange.addColumn("col7", DataTypes.STRING(), "", after);
// rename column
SchemaChange renameColumn = SchemaChange.renameColumn("col3",
"col3_new_name");
// drop column
SchemaChange dropColumn = SchemaChange.dropColumn("col6");
// update column comment
- SchemaChange updateColumnComment =
SchemaChange.updateColumnComment(new String[]{"col4"}, "col4 field");
+ SchemaChange updateColumnComment =
+ SchemaChange.updateColumnComment(new String[] {"col4"}, "col4
field");
// update nested column comment
SchemaChange updateNestedColumnComment =
- SchemaChange.updateColumnComment(new String[]{"col5", "f1"}, "col5 f1
field");
+ SchemaChange.updateColumnComment(new String[] {"col5", "f1"},
"col5 f1 field");
// update column type
SchemaChange updateColumnType = SchemaChange.updateColumnType("col4",
DataTypes.DOUBLE());
// update column position, you need to pass in a parameter of type Move
- SchemaChange updateColumnPosition =
SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4"));
+ SchemaChange updateColumnPosition =
+ SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4"));
// update column nullability
- SchemaChange updateColumnNullability =
SchemaChange.updateColumnNullability(new String[]{"col4"}, false);
+ SchemaChange updateColumnNullability =
+ SchemaChange.updateColumnNullability(new String[] {"col4"}, false);
// update nested column nullability
- SchemaChange updateNestedColumnNullability =
- SchemaChange.updateColumnNullability(new String[]{"col5", "f2"},
false);
-
- SchemaChange[] schemaChanges = new SchemaChange[] {
- addOption, removeOption, addColumn, addColumnAfterField,
renameColumn, dropColumn,
- updateColumnComment, updateNestedColumnComment, updateColumnType,
updateColumnPosition,
- updateColumnNullability, updateNestedColumnNullability};
+ SchemaChange updateNestedColumnNullability =
+ SchemaChange.updateColumnNullability(new String[] {"col5", "f2"},
false);
+
+ SchemaChange[] schemaChanges =
+ new SchemaChange[] {
+ addOption,
+ removeOption,
+ addColumn,
+ addColumnAfterField,
+ renameColumn,
+ dropColumn,
+ updateColumnComment,
+ updateNestedColumnComment,
+ updateColumnType,
+ updateColumnPosition,
+ updateColumnNullability,
+ updateNestedColumnNullability
+ };
try {
catalog.alterTable(identifier, Arrays.asList(schemaChanges),
false);
} catch (Catalog.TableNotExistException e) {
// do something
- } catch (Catalog.TableAlreadyExistException e) {
+ } catch (Catalog.ColumnAlreadyExistException e) {
// do something
- } catch (Catalog.DatabaseNotExistException e) {
+ } catch (Catalog.ColumnNotExistException e) {
// do something
- }
+ }
}
}
```
@@ -417,30 +448,36 @@ The reading is divided into two stages:
```java
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.util.List;
public class ReadTable {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
// 1. Create a ReadBuilder and push filter (`withFilter`)
// and projection (`withProjection`) if necessary
- // [{"Alice", 12},{"Bob", 5},{"Emily", 18}]
- PredicateBuilder builder =
- new
PredicateBuilder(RowType.of(DataTypes.STRING(),DataTypes.INT()));
- Predicate notNull = builder.isNotNull(0);
- Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
+ Table table = GetTable.getTable();
- ReadBuilder readBuilder = table.newReadBuilder()
+ PredicateBuilder builder =
+ new PredicateBuilder(RowType.of(DataTypes.STRING(),
DataTypes.INT()));
+ Predicate notNull = builder.isNotNull(0);
+ Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
+
+ int[] projection = new int[] {0, 1};
+
+ ReadBuilder readBuilder =
+ table.newReadBuilder()
.withProjection(projection)
.withFilter(Lists.newArrayList(notNull, greaterOrEqual));
@@ -452,7 +489,7 @@ public class ReadTable {
// 4. Read a split in task
TableRead read = readBuilder.newRead();
RecordReader<InternalRow> reader = read.createReader(splits);
- reader.forEachRemaining(ReadTable::readRow);
+ reader.forEachRemaining(System.out::println);
}
}
```
@@ -466,21 +503,21 @@ The writing is divided into two stages:
When the commit fails for certain reason, abort unsuccessful commit via
CommitMessages.
```java
-import java.util.List;
-
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-public class WriteTable {
+import java.util.List;
- public static void main(String[] args) {
+public class BatchWrite {
+ public static void main(String[] args) throws Exception {
// 1. Create a WriteBuilder (Serializable)
- BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder()
- .withOverwrite(staticPartition);
+ Table table = GetTable.getTable();
+ BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder().withOverwrite();
// 2. Write records in distributed tasks
BatchTableWrite write = writeBuilder.newWrite();
@@ -498,7 +535,7 @@ public class WriteTable {
// 3. Collect all CommitMessages to a global node and commit
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
-
+
// Abort unsuccessful commit to delete data files
// commit.abort(messages);
}
@@ -513,24 +550,40 @@ StreamTableScan provides the ability to checkpoint and
restore, which can let yo
during stream reading.
```java
-import java.io.IOException;
-import java.util.List;
-
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
public class StreamReadTable {
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
// 1. Create a ReadBuilder and push filter (`withFilter`)
// and projection (`withProjection`) if necessary
- ReadBuilder readBuilder = table.newReadBuilder()
+ Table table = GetTable.getTable();
+
+ PredicateBuilder builder =
+ new PredicateBuilder(RowType.of(DataTypes.STRING(),
DataTypes.INT()));
+ Predicate notNull = builder.isNotNull(0);
+ Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
+
+ int[] projection = new int[] {0, 1};
+
+ ReadBuilder readBuilder =
+ table.newReadBuilder()
.withProjection(projection)
- .withFilter(filter);
+ .withFilter(Lists.newArrayList(notNull, greaterOrEqual));
// 2. Plan splits in 'Coordinator' (or named 'Driver')
StreamTableScan scan = readBuilder.newStreamScan();
@@ -539,13 +592,15 @@ public class StreamReadTable {
// Distribute these splits to different tasks
Long state = scan.checkpoint();
- // can be restored in scan.restore(state) after failover
- }
+ // can be restored in scan.restore(state) after fail over
- // 3. Read a split in task
- TableRead read = readBuilder.newRead();
- RecordReader<InternalRow> reader = read.createReader(splits);
- reader.forEachRemaining(row -> System.out.println(row));
+ // 3. Read a split in task
+ TableRead read = readBuilder.newRead();
+ RecordReader<InternalRow> reader = read.createReader(splits);
+ reader.forEachRemaining(System.out::println);
+
+ Thread.sleep(1000);
+ }
}
}
```
@@ -565,44 +620,53 @@ Key points to achieve exactly-once consistency:
to exclude the committed messages by commitIdentifier.
```java
-import java.util.List;
-
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import java.util.List;
+
public class StreamWriteTable {
public static void main(String[] args) throws Exception {
// 1. Create a WriteBuilder (Serializable)
+ Table table = GetTable.getTable();
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
// 2. Write records in distributed tasks
StreamTableWrite write = writeBuilder.newWrite();
// commitIdentifier like Flink checkpointId
long commitIdentifier = 0;
+
while (true) {
+ GenericRow record1 =
GenericRow.of(BinaryString.fromString("Alice"), 12);
+ GenericRow record2 = GenericRow.of(BinaryString.fromString("Bob"),
5);
+ GenericRow record3 =
GenericRow.of(BinaryString.fromString("Emily"), 18);
write.write(record1);
write.write(record2);
write.write(record3);
- List<CommitMessage> messages = write.prepareCommit(
- false, commitIdentifier);
+ List<CommitMessage> messages = write.prepareCommit(false,
commitIdentifier);
commitIdentifier++;
- }
- // 3. Collect all CommitMessages to a global node and commit
- StreamTableCommit commit = writeBuilder.newCommit();
- commit.commit(commitIdentifier, messages);
-
- // 4. When failure occurs and you're not sure if the commit process is
successful,
- // you can use `filterAndCommit` to retry the commit process.
- // Succeeded commits will be automatically skipped.
- /*
- Map<Long, List<CommitMessage>> commitIdentifiersAndMessages = new
HashMap<>();
- commitIdentifiersAndMessages.put(commitIdentifier, messages);
- commit.filterAndCommit(commitIdentifiersAndMessages);
- */
+ // 3. Collect all CommitMessages to a global node and commit
+ StreamTableCommit commit = writeBuilder.newCommit();
+ commit.commit(commitIdentifier, messages);
+
+ // 4. When failure occurs and you're not sure if the commit
process is successful,
+ // you can use `filterAndCommit` to retry the commit process.
+ // Succeeded commits will be automatically skipped.
+ /*
+ Map<Long, List<CommitMessage>> commitIdentifiersAndMessages = new
HashMap<>();
+ commitIdentifiersAndMessages.put(commitIdentifier, messages);
+ commit.filterAndCommit(commitIdentifiersAndMessages);
+ */
+
+ Thread.sleep(1000);
+ }
}
}
```
@@ -642,5 +706,3 @@ public class StreamWriteTable {
| <= | org.apache.paimon.predicate.PredicateBuilder.LessOrEqual |
| > | org.apache.paimon.predicate.PredicateBuilder.GreaterThan |
| >= | org.apache.paimon.predicate.PredicateBuilder.GreaterOrEqual |
-
-
diff --git a/docs/content/concepts/file-operations.md
b/docs/content/concepts/file-operations.md
index 58696d836..afcd7515b 100644
--- a/docs/content/concepts/file-operations.md
+++ b/docs/content/concepts/file-operations.md
@@ -155,7 +155,7 @@ The second `commit` takes place and executing `SELECT *
FROM T` will return
10 rows. A new snapshot, namely `snapshot-2`, is created and gives us the
following physical file layout:
```bash
- % ls -atR .
+ % ls -1tR .
./T:
dt=20230501
dt=20230502
@@ -178,12 +178,12 @@ EARLIEST
snapshot-1
./T/manifest:
-manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list
for snapshot-2
-manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for
snapshot-2
+manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for
snapshot-2
+manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for
snapshot-2
manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for
snapshot-2
-manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list
for snapshot-1
-manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for
snapshot-1
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for
snapshot-1
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for
snapshot-1
manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1
./T/dt=20230501/bucket-0:
@@ -245,6 +245,7 @@ Let's trigger the full-compaction now, and run a dedicated
compaction job throug
{{< label Batch >}}
```bash
<FLINK_HOME>/bin/flink run \
+ -D execution.runtime-mode=batch \
/path/to/paimon-flink-action-{{< version >}}.jar \
compact \
--warehouse <warehouse-path> \
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 06153616f..cb47455e9 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -269,11 +269,11 @@ This is of great help in replacing log deduplication in
streaming computation.
Streaming queries will continuously produce the latest changes. These changes
can come from the underlying table files or from an [external log system]({{<
ref "concepts/external-log-systems" >}}) like Kafka. Compared to the external
log system, changes from table files have lower cost but higher latency
(depending on how often snapshots are created).
-By specifying the `changelog-producer` table property when creating the table,
users can choose the pattern of changes produced from files.
+By specifying the `changelog-producer` table property when creating the table,
users can choose the pattern of changes produced from table files.
{{< hint info >}}
-The `changelog-producer` table property only affects changelog from files. It
does not affect the external log system.
+The `changelog-producer` table property only affects changelog from table
files. It does not affect the external log system.
{{< /hint >}}
diff --git a/paimon-benchmark/paimon-cluster-benchmark/README.md
b/paimon-benchmark/paimon-cluster-benchmark/README.md
index 2907639c1..c67a5597d 100644
--- a/paimon-benchmark/paimon-cluster-benchmark/README.md
+++ b/paimon-benchmark/paimon-cluster-benchmark/README.md
@@ -7,7 +7,7 @@ This is the cluster benchmark module for Paimon. Inspired by
[Nexmark](https://g
* This benchmark only runs on Linux. You'll need a Linux environment
(preferably an EMR cluster). For a more reasonable result, we recommend this
cluster to have:
* One master node with 8 cores and 16GB RAM.
* Two worker nodes with 16 cores and 64GB RAM.
-* This benchmark runs on a standalone Flink cluster. Download Flink >= 1.15
from the [Apache Flink's
website](https://flink.apache.org/downloads.html#apache-flink-1160) and setup a
standalone cluster. Flink's job manager must be on the master node of your EMR
cluster. We recommend the following Flink configurations:
+* This benchmark runs on a standalone Flink cluster. Download Flink >= 1.15
from the [Apache Flink's
website](https://flink.apache.org/downloads.html#apache-flink-1160) and set up
a standalone cluster. Flink's job manager must be on the master node of your
EMR cluster. We recommend the following Flink configurations:
```yaml
jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 4096m
@@ -21,13 +21,13 @@ This is the cluster benchmark module for Paimon. Inspired
by [Nexmark](https://g
* This benchmark needs the `FLINK_HOME` environment variable. Set `FLINK_HOME`
to your Flink directory.
### Setup Benchmark
-* Build this module with command `mvn clean package`.
-* Copy `target/paimon-benchmark-bin/paimon-benchmark` to the master node of
your EMR cluster.
-* Modify `paimon-benchmark/conf/benchmark.yaml` on the master node. You must
change these config options:
- * `benchmark.metric.reporter.host` and `flink.rest.address`: set these to
the address of master node of your EMR cluster.
- * `benchmark.sink.path` is the path to which queries insert records. This
should point to a non-existing path. Contents of this path will be removed
before each test.
-* Copy `paimon-benchmark` to every worker node of your EMR cluster.
-* Run `paimon-benchmark/bin/setup_cluster.sh` in master node. This activates
the CPU metrics collector in worker nodes. Note that if you restart your Flink
cluster, you must also restart the CPU metrics collectors. To stop CPU metrics
collectors, run `paimon-benchmark/bin/shutdown_cluster.sh` in master node.
+1. Build this module with command `mvn clean package -DskipTests -pl
paimon-benchmark/paimon-cluster-benchmark/ -am`.
+2. Copy
`paimon-benchmark/paimon-cluster-benchmark/target/paimon-benchmark-bin/paimon-benchmark`
to the master node of your EMR cluster.
+3. Modify `paimon-benchmark/conf/benchmark.yaml` on the master node. You must
change these config options:
+ * `benchmark.metric.reporter.host` and `flink.rest.address`: set these to
the address of master node of your EMR cluster.
+ * `benchmark.sink.path` is the path to which queries insert records. This
should point to a non-existing path. Contents of this path will be removed
before each test.
+4. Copy `paimon-benchmark` to every worker node of your EMR cluster.
+5. Run `paimon-benchmark/bin/setup_cluster.sh` in master node. This activates
the CPU metrics collector in worker nodes. Note that if you restart your Flink
cluster, you must also restart the CPU metrics collectors. To stop CPU metrics
collectors, run `paimon-benchmark/bin/shutdown_cluster.sh` in master node.
### Run Benchmark
* Run `paimon-benchmark/bin/run_benchmark.sh <query> <sink>` to run `<query>`
for `<sink>`. Currently `<query>` can be `q1` or `all`, and sink can only be
`paimon`.
diff --git
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
index c5f137a9f..e3e2b4027 100644
---
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
+++
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/Benchmark.java
@@ -33,9 +33,11 @@ import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.nio.file.Path;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/** The entry point to run benchmark. */
@@ -59,7 +61,7 @@ public class Benchmark {
public static void main(String[] args) throws Exception {
if (args.length != 6) {
throw new RuntimeException(
- "Usage: --location /path/to/benchmark --queries q1,q3
--sinks paimon,hudi_merge_on_read");
+ "Usage: --location /path/to/benchmark --queries q1 --sinks
paimon");
}
Options options = getOptions();
@@ -71,20 +73,41 @@ public class Benchmark {
String queriesValue = line.getOptionValue(QUERIES.getOpt());
List<Query> queries = Query.load(location);
if (!"all".equalsIgnoreCase(queriesValue)) {
- List<String> wantedQueries =
+ Set<String> wantedQueries =
Arrays.stream(queriesValue.split(","))
.map(String::trim)
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
+ Set<String> supportedQueries =
+
queries.stream().map(Query::name).collect(Collectors.toSet());
+ Set<String> unSupportedQueries = new HashSet<>(wantedQueries);
+ unSupportedQueries.removeAll(supportedQueries);
+ if (!unSupportedQueries.isEmpty()) {
+ throw new RuntimeException(
+ "Unsupported queries: "
+ + unSupportedQueries
+ + ", current supported queries are "
+ + supportedQueries);
+ }
queries.removeIf(q -> !wantedQueries.contains(q.name()));
}
String sinksValue = line.getOptionValue(SINKS.getOpt());
List<Sink> sinks = Sink.load(location);
if (!"all".equalsIgnoreCase(sinksValue)) {
- List<String> wantedSinks =
+ Set<String> wantedSinks =
Arrays.stream(sinksValue.split(","))
.map(String::trim)
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
+ Set<String> supportedSinks =
sinks.stream().map(Sink::name).collect(Collectors.toSet());
+ Set<String> unSupportedSinks = new HashSet<>(wantedSinks);
+ unSupportedSinks.removeAll(supportedSinks);
+ if (!unSupportedSinks.isEmpty()) {
+ throw new RuntimeException(
+ "Unsupported sinks: "
+ + unSupportedSinks
+ + ", current supported sinks are "
+ + supportedSinks);
+ }
sinks.removeIf(s -> !wantedSinks.contains(s.name()));
}
@@ -212,6 +235,6 @@ public class Benchmark {
}
}
builder.append(separator);
- System.err.println(builder.toString());
+ System.err.println(builder);
}
}
diff --git
a/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
b/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
index 65dc0c7b1..4fe71a0c0 100644
--- a/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
+++ b/paimon-benchmark/paimon-cluster-benchmark/src/main/resources/bin/config.sh
@@ -47,7 +47,7 @@ export BENCHMARK_LOG_DIR
export BENCHMARK_CONF_DIR
export BENCHMARK_BIN_DIR
-# Auxilliary function which extracts the name of host from a line which
+# Auxiliary function which extracts the name of host from a line which
# also potentially includes topology information and the taskManager type
extractHostName() {
# handle comments: extract first part of string (before first # character)