This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 511d3bc9c [cdc] Introduce RichCdcSinkBuilder API (#1440)
511d3bc9c is described below
commit 511d3bc9c37672119c4b02558ff7705df2311cc8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 28 12:14:33 2023 +0800
[cdc] Introduce RichCdcSinkBuilder API (#1440)
---
docs/content/api/flink-api.md | 71 ++++--------
.../apache/paimon/flink/sink/cdc/CdcRecord.java | 5 +
.../flink/sink/cdc/RichCdcParserFactory.java | 70 ++++++++++++
.../paimon/flink/sink/cdc/RichCdcRecord.java | 119 +++++++++++++++++++++
.../paimon/flink/sink/cdc/RichCdcSinkBuilder.java | 61 +++++++++++
5 files changed, 275 insertions(+), 51 deletions(-)
diff --git a/docs/content/api/flink-api.md b/docs/content/api/flink-api.md
index 310152ec9..b0c65d20f 100644
--- a/docs/content/api/flink-api.md
+++ b/docs/content/api/flink-api.md
@@ -149,47 +149,44 @@ Paimon supports ingest data into Paimon tables with
schema evolution.
- You can use Java API to write cdc records into Paimon Tables.
- You can write records to Paimon's partial-update table with adding columns
dynamically.
-Here is an example to use `CdcSinkBuilder` API:
+Here is an example to use `RichCdcSinkBuilder` API:
```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
-import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import java.util.List;
-import java.util.Optional;
+import static org.apache.paimon.types.RowKind.INSERT;
public class WriteCdcToTable {
public static void writeTo() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // create a Cdc DataStream
- DataStream<String> dataStream =
+ DataStream<RichCdcRecord> dataStream =
env.fromElements(
- "INSERT INTO T VALUES (1, 1)",
- "INSERT INTO T VALUES (2, 2)",
- "ALTER TABLE T ADD (c INT)",
- "INSERT INTO T VALUES (3, 3, 3)",
- "INSERT INTO T VALUES (4, 4, 4)");
-
- CdcSinkBuilder<String> builder = new CdcSinkBuilder<>();
- builder.withInput(dataStream);
- builder.withTable(createTableIfNotExists());
- builder.withParserFactory(new EventParserFactory());
- builder.build();
+ RichCdcRecord.builder(INSERT)
+ .field("order_id", DataTypes.BIGINT(), "123")
+ .field("price", DataTypes.DOUBLE(), "62.2")
+ .build(),
+ // dt field will be added with schema evolution
+ RichCdcRecord.builder(INSERT)
+ .field("order_id", DataTypes.BIGINT(), "245")
+ .field("price", DataTypes.DOUBLE(), "82.1")
+ .field("dt", DataTypes.TIMESTAMP(),
"2023-06-12 20:21:12")
+ .build());
+
+ new
RichCdcSinkBuilder().withInput(dataStream).withTable(createTableIfNotExists()).build();
env.execute();
}
@@ -199,9 +196,9 @@ public class WriteCdcToTable {
Catalog catalog = CatalogFactory.createCatalog(context);
Schema.Builder schemaBuilder = Schema.newBuilder();
- schemaBuilder.primaryKey("a");
- schemaBuilder.column("a", DataTypes.INT());
- schemaBuilder.column("b", DataTypes.INT());
+ schemaBuilder.primaryKey("order_id");
+ schemaBuilder.column("order_id", DataTypes.BIGINT());
+ schemaBuilder.column("price", DataTypes.DOUBLE());
Schema schema = schemaBuilder.build();
Identifier identifier = Identifier.create("my_db", "T");
try {
@@ -211,33 +208,5 @@ public class WriteCdcToTable {
}
return catalog.getTable(identifier);
}
-
- private static class EventParserFactory implements
EventParser.Factory<String> {
-
- @Override
- public EventParser<String> create() {
- return new EventParser<String>() {
-
- private String event;
-
- @Override
- public void setRawEvent(String rawEvent) {
- event = rawEvent;
- }
-
- @Override
- public List<DataField> parseSchemaChange() {
- // parse from ADD (c INT)
- return ...;
- }
-
- @Override
- public List<CdcRecord> parseRecords() {
- // parse from VALUES (1, 1)
- return ...;
- }
- };
- }
- }
}
```
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 24fd957b3..c4b7ea7a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -57,6 +57,11 @@ public class CdcRecord implements Serializable {
return Objects.equals(kind, that.kind) && Objects.equals(fields,
that.fields);
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(kind, fields);
+ }
+
@Override
public String toString() {
return kind.shortString() + " " + fields;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
new file mode 100644
index 000000000..3cc9cae08
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A {@link EventParser.Factory} for {@link RichCdcRecord}. */
+public class RichCdcParserFactory implements
EventParser.Factory<RichCdcRecord> {
+
+ @Override
+ public RichEventParser create() {
+ return new RichEventParser();
+ }
+
+ private static class RichEventParser implements EventParser<RichCdcRecord>
{
+
+ private RichCdcRecord record;
+
+ private final Map<String, DataType> previousDataFields = new
HashMap<>();
+
+ @Override
+ public void setRawEvent(RichCdcRecord rawEvent) {
+ this.record = rawEvent;
+ }
+
+ @Override
+ public List<DataField> parseSchemaChange() {
+ List<DataField> change = new ArrayList<>();
+ record.fieldTypes()
+ .forEach(
+ (field, type) -> {
+ DataType previous =
previousDataFields.get(field);
+ if (!Objects.equals(previous, type)) {
+ previousDataFields.put(field, type);
+ change.add(new DataField(0, field, type));
+ }
+ });
+ return change;
+ }
+
+ @Override
+ public List<CdcRecord> parseRecords() {
+ return Collections.singletonList(record.toCdcRecord());
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
new file mode 100644
index 000000000..7d02c8a60
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** A change message contains schema and data. */
+@Experimental
+public class RichCdcRecord implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowKind kind;
+ private final Map<String, DataType> fieldTypes;
+ private final Map<String, String> fieldValues;
+
+ public RichCdcRecord(
+ RowKind kind, Map<String, DataType> fieldTypes, Map<String,
String> fieldValues) {
+ this.kind = kind;
+ this.fieldTypes = fieldTypes;
+ this.fieldValues = fieldValues;
+ }
+
+ public RowKind kind() {
+ return kind;
+ }
+
+ public Map<String, DataType> fieldTypes() {
+ return fieldTypes;
+ }
+
+ public Map<String, String> fieldValues() {
+ return fieldValues;
+ }
+
+ public CdcRecord toCdcRecord() {
+ return new CdcRecord(kind, fieldValues);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RichCdcRecord that = (RichCdcRecord) o;
+ return kind == that.kind
+ && Objects.equals(fieldTypes, that.fieldTypes)
+ && Objects.equals(fieldValues, that.fieldValues);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(kind, fieldTypes, fieldValues);
+ }
+
+ @Override
+ public String toString() {
+ return "{"
+ + "kind="
+ + kind
+ + ", fieldTypes="
+ + fieldTypes
+ + ", fieldValues="
+ + fieldValues
+ + '}';
+ }
+
+ public static Builder builder(RowKind kind) {
+ return new Builder(kind);
+ }
+
+ /** Builder for {@link RichCdcRecord}. */
+ public static class Builder {
+
+ private final RowKind kind;
+ private final Map<String, DataType> fieldTypes = new HashMap<>();
+ private final Map<String, String> fieldValues = new HashMap<>();
+
+ public Builder(RowKind kind) {
+ this.kind = kind;
+ }
+
+ public Builder field(String name, DataType type, String value) {
+ fieldTypes.put(name, type);
+ fieldValues.put(name, value);
+ return this;
+ }
+
+ public RichCdcRecord build() {
+ return new RichCdcRecord(kind, fieldTypes, fieldValues);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
new file mode 100644
index 000000000..130d3812c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
+
+/** Builder for sink when syncing {@link RichCdcRecord} records into one
Paimon table. */
+@Experimental
+public class RichCdcSinkBuilder {
+
+ private DataStream<RichCdcRecord> input = null;
+ private Table table = null;
+
+ @Nullable private Integer parallelism;
+
+ public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
+ this.input = input;
+ return this;
+ }
+
+ public RichCdcSinkBuilder withTable(Table table) {
+ this.table = table;
+ return this;
+ }
+
+ public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public DataStreamSink<?> build() {
+ CdcSinkBuilder<RichCdcRecord> builder = new CdcSinkBuilder<>();
+ return builder.withTable(table)
+ .withInput(input)
+ .withParserFactory(new RichCdcParserFactory())
+ .withParallelism(parallelism)
+ .build();
+ }
+}