This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 511d3bc9c [cdc] Introduce RichCdcSinkBuilder API (#1440)
511d3bc9c is described below

commit 511d3bc9c37672119c4b02558ff7705df2311cc8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 28 12:14:33 2023 +0800

    [cdc] Introduce RichCdcSinkBuilder API (#1440)
---
 docs/content/api/flink-api.md                      |  71 ++++--------
 .../apache/paimon/flink/sink/cdc/CdcRecord.java    |   5 +
 .../flink/sink/cdc/RichCdcParserFactory.java       |  70 ++++++++++++
 .../paimon/flink/sink/cdc/RichCdcRecord.java       | 119 +++++++++++++++++++++
 .../paimon/flink/sink/cdc/RichCdcSinkBuilder.java  |  61 +++++++++++
 5 files changed, 275 insertions(+), 51 deletions(-)

diff --git a/docs/content/api/flink-api.md b/docs/content/api/flink-api.md
index 310152ec9..b0c65d20f 100644
--- a/docs/content/api/flink-api.md
+++ b/docs/content/api/flink-api.md
@@ -149,47 +149,44 @@ Paimon supports ingest data into Paimon tables with 
schema evolution.
 - You can use Java API to write cdc records into Paimon Tables.
 - You can write records to Paimon's partial-update table with adding columns 
dynamically.
 
-Here is an example to use `CdcSinkBuilder` API:
+Here is an example to use `RichCdcSinkBuilder` API:
 
 ```java
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
-import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-import java.util.List;
-import java.util.Optional;
+import static org.apache.paimon.types.RowKind.INSERT;
 
 public class WriteCdcToTable {
 
     public static void writeTo() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // create a Cdc DataStream
-        DataStream<String> dataStream =
+        DataStream<RichCdcRecord> dataStream =
                 env.fromElements(
-                        "INSERT INTO T VALUES (1, 1)",
-                        "INSERT INTO T VALUES (2, 2)",
-                        "ALTER TABLE T ADD (c INT)",
-                        "INSERT INTO T VALUES (3, 3, 3)",
-                        "INSERT INTO T VALUES (4, 4, 4)");
-
-        CdcSinkBuilder<String> builder = new CdcSinkBuilder<>();
-        builder.withInput(dataStream);
-        builder.withTable(createTableIfNotExists());
-        builder.withParserFactory(new EventParserFactory());
-        builder.build();
+                        RichCdcRecord.builder(INSERT)
+                                .field("order_id", DataTypes.BIGINT(), "123")
+                                .field("price", DataTypes.DOUBLE(), "62.2")
+                                .build(),
+                        // dt field will be added with schema evolution
+                        RichCdcRecord.builder(INSERT)
+                                .field("order_id", DataTypes.BIGINT(), "245")
+                                .field("price", DataTypes.DOUBLE(), "82.1")
+                                .field("dt", DataTypes.TIMESTAMP(), 
"2023-06-12 20:21:12")
+                                .build());
+
+        new 
RichCdcSinkBuilder().withInput(dataStream).withTable(createTableIfNotExists()).build();
 
         env.execute();
     }
@@ -199,9 +196,9 @@ public class WriteCdcToTable {
         Catalog catalog = CatalogFactory.createCatalog(context);
 
         Schema.Builder schemaBuilder = Schema.newBuilder();
-        schemaBuilder.primaryKey("a");
-        schemaBuilder.column("a", DataTypes.INT());
-        schemaBuilder.column("b", DataTypes.INT());
+        schemaBuilder.primaryKey("order_id");
+        schemaBuilder.column("order_id", DataTypes.BIGINT());
+        schemaBuilder.column("price", DataTypes.DOUBLE());
         Schema schema = schemaBuilder.build();
         Identifier identifier = Identifier.create("my_db", "T");
         try {
@@ -211,33 +208,5 @@ public class WriteCdcToTable {
         }
         return catalog.getTable(identifier);
     }
-
-    private static class EventParserFactory implements 
EventParser.Factory<String> {
-
-        @Override
-        public EventParser<String> create() {
-            return new EventParser<String>() {
-
-                private String event;
-
-                @Override
-                public void setRawEvent(String rawEvent) {
-                    event = rawEvent;
-                }
-
-                @Override
-                public List<DataField> parseSchemaChange() {
-                    // parse from ADD (c INT)
-                    return ...;
-                }
-
-                @Override
-                public List<CdcRecord> parseRecords() {
-                    // parse from VALUES (1, 1)
-                    return ...;
-                }
-            };
-        }
-    }
 }
 ```
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 24fd957b3..c4b7ea7a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -57,6 +57,11 @@ public class CdcRecord implements Serializable {
         return Objects.equals(kind, that.kind) && Objects.equals(fields, 
that.fields);
     }
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(kind, fields);
+    }
+
     @Override
     public String toString() {
         return kind.shortString() + " " + fields;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
new file mode 100644
index 000000000..3cc9cae08
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A {@link EventParser.Factory} for {@link RichCdcRecord}. */
+public class RichCdcParserFactory implements 
EventParser.Factory<RichCdcRecord> {
+
+    @Override
+    public RichEventParser create() {
+        return new RichEventParser();
+    }
+
+    private static class RichEventParser implements EventParser<RichCdcRecord> 
{
+
+        private RichCdcRecord record;
+
+        private final Map<String, DataType> previousDataFields = new 
HashMap<>();
+
+        @Override
+        public void setRawEvent(RichCdcRecord rawEvent) {
+            this.record = rawEvent;
+        }
+
+        @Override
+        public List<DataField> parseSchemaChange() {
+            List<DataField> change = new ArrayList<>();
+            record.fieldTypes()
+                    .forEach(
+                            (field, type) -> {
+                                DataType previous = 
previousDataFields.get(field);
+                                if (!Objects.equals(previous, type)) {
+                                    previousDataFields.put(field, type);
+                                    change.add(new DataField(0, field, type));
+                                }
+                            });
+            return change;
+        }
+
+        @Override
+        public List<CdcRecord> parseRecords() {
+            return Collections.singletonList(record.toCdcRecord());
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
new file mode 100644
index 000000000..7d02c8a60
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** A change message contains schema and data. */
+@Experimental
+public class RichCdcRecord implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowKind kind;
+    private final Map<String, DataType> fieldTypes;
+    private final Map<String, String> fieldValues;
+
+    public RichCdcRecord(
+            RowKind kind, Map<String, DataType> fieldTypes, Map<String, 
String> fieldValues) {
+        this.kind = kind;
+        this.fieldTypes = fieldTypes;
+        this.fieldValues = fieldValues;
+    }
+
+    public RowKind kind() {
+        return kind;
+    }
+
+    public Map<String, DataType> fieldTypes() {
+        return fieldTypes;
+    }
+
+    public Map<String, String> fieldValues() {
+        return fieldValues;
+    }
+
+    public CdcRecord toCdcRecord() {
+        return new CdcRecord(kind, fieldValues);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RichCdcRecord that = (RichCdcRecord) o;
+        return kind == that.kind
+                && Objects.equals(fieldTypes, that.fieldTypes)
+                && Objects.equals(fieldValues, that.fieldValues);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(kind, fieldTypes, fieldValues);
+    }
+
+    @Override
+    public String toString() {
+        return "{"
+                + "kind="
+                + kind
+                + ", fieldTypes="
+                + fieldTypes
+                + ", fieldValues="
+                + fieldValues
+                + '}';
+    }
+
+    public static Builder builder(RowKind kind) {
+        return new Builder(kind);
+    }
+
+    /** Builder for {@link RichCdcRecord}. */
+    public static class Builder {
+
+        private final RowKind kind;
+        private final Map<String, DataType> fieldTypes = new HashMap<>();
+        private final Map<String, String> fieldValues = new HashMap<>();
+
+        public Builder(RowKind kind) {
+            this.kind = kind;
+        }
+
+        public Builder field(String name, DataType type, String value) {
+            fieldTypes.put(name, type);
+            fieldValues.put(name, value);
+            return this;
+        }
+
+        public RichCdcRecord build() {
+            return new RichCdcRecord(kind, fieldTypes, fieldValues);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
new file mode 100644
index 000000000..130d3812c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
+
+/** Builder for sink when syncing {@link RichCdcRecord} records into one 
Paimon table. */
+@Experimental
+public class RichCdcSinkBuilder {
+
+    private DataStream<RichCdcRecord> input = null;
+    private Table table = null;
+
+    @Nullable private Integer parallelism;
+
+    public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
+        this.input = input;
+        return this;
+    }
+
+    public RichCdcSinkBuilder withTable(Table table) {
+        this.table = table;
+        return this;
+    }
+
+    public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
+        this.parallelism = parallelism;
+        return this;
+    }
+
+    public DataStreamSink<?> build() {
+        CdcSinkBuilder<RichCdcRecord> builder = new CdcSinkBuilder<>();
+        return builder.withTable(table)
+                .withInput(input)
+                .withParserFactory(new RichCdcParserFactory())
+                .withParallelism(parallelism)
+                .build();
+    }
+}

Reply via email to