leonardBang commented on code in PR #4053:
URL: https://github.com/apache/flink-cdc/pull/4053#discussion_r2189376607


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink;
+
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRecordSerializer;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.RowWithOp;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.types.DataType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.OperationType.APPEND;
+import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.OperationType.DELETE;
+import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.OperationType.UPSERT;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlinkConversions.sameCdcColumnsIgnoreCommentAndDefaultValue;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlinkConversions.toFlussSchema;
+
+/** Serialization schema that converts a CDC data record to a Fluss event. */
+public class FlussEventSerializationSchema implements 
FlussRecordSerializer<Event> {
+    private static final long serialVersionUID = 1L;
+
+    private transient Map<TableId, TableSchemaInfo> tableInfoMap;
+    private transient Connection connection;
+
+    @Override
+    public void open(Connection connection) {
+        this.tableInfoMap = new HashMap<>();
+        this.connection = connection;
+    }
+
+    @Override
+    public FlussEvent serialize(Event record) throws IOException {
+        if (record instanceof SchemaChangeEvent) {
+            applySchemaChangeEvent((SchemaChangeEvent) record);
+            return new FlussEvent(getTablePath(((SchemaChangeEvent) 
record).tableId()), null, true);
+        } else if (record instanceof DataChangeEvent) {
+            RowWithOp rowWithOp = applyDataChangeEvent((DataChangeEvent) 
record);
+            return new FlussEvent(
+                    getTablePath(((DataChangeEvent) record).tableId()),
+                    Collections.singletonList(rowWithOp),
+                    false);

Review Comment:
   ```suggestion
       public FlussEvent serialize(Event event) throws IOException {
           if (event instanceof SchemaChangeEvent) {
               applySchemaChangeEvent((SchemaChangeEvent) event);
               return new FlussEvent(getTablePath(((SchemaChangeEvent) 
event).tableId()), null, true);
           } else if (event instanceof DataChangeEvent) {
           RowWithOp rowWithOp = applyDataChangeEvent((DataChangeEvent) event);
               return new FlussEvent(
                       getTablePath(((DataChangeEvent) event).tableId()),
                       Collections.singletonList(rowWithOp),
                       false);
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink;
+
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRecordSerializer;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.RowWithOp;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.types.DataType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.OperationType.APPEND;
+import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.OperationType.DELETE;
+import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.OperationType.UPSERT;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlinkConversions.sameCdcColumnsIgnoreCommentAndDefaultValue;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlinkConversions.toFlussSchema;
+
+/** Serialization schema that converts a CDC data record to a Fluss event. */
+public class FlussEventSerializationSchema implements 
FlussRecordSerializer<Event> {

Review Comment:
   Could we unify the usage of `event` and `record`? 
   ```suggestion
   public class FlussEventSerializationSchema implements 
FlussEventSerializer<Event> {
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink;
+
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropTableEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.table.api.ValidationException;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.DatabaseDescriptor;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlinkConversions.toFlussTable;
+
+/** {@link MetadataApplier} for fluss. */
+public class FlussMetaDataApplier implements MetadataApplier {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussMetaDataApplier.class);
+    private final Configuration flussClientConfig;
+    private final Map<String, String> tableProperties;
+    private final Map<String, List<String>> bucketKeysMap;
+    private final Map<String, Integer> bucketNumMap;
+    private Set<SchemaChangeEventType> enabledEventTypes =
+            new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
+
+    public FlussMetaDataApplier(
+            Configuration flussClientConfig,
+            Map<String, String> tableProperties,
+            Map<String, List<String>> bucketKeysMap,
+            Map<String, Integer> bucketNumMap) {
+        this.flussClientConfig = flussClientConfig;
+        this.tableProperties = tableProperties;
+        this.bucketKeysMap = bucketKeysMap;
+        this.bucketNumMap = bucketNumMap;
+    }
+
+    @Override
+    public MetadataApplier setAcceptedSchemaEvolutionTypes(
+            Set<SchemaChangeEventType> schemaEvolutionTypes) {
+        enabledEventTypes = schemaEvolutionTypes;
+        return this;
+    }
+
+    @Override
+    public boolean acceptsSchemaEvolutionType(SchemaChangeEventType 
schemaChangeEventType) {
+        return enabledEventTypes.contains(schemaChangeEventType);
+    }
+
+    @Override
+    public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
+        return 
Arrays.stream(SchemaChangeEventTypeFamily.TABLE).collect(Collectors.toSet());
+    }
+
+    @Override
+    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
+        LOG.info("fluss metadata applier receive schemaChangeEvent {}", 
schemaChangeEvent);
+        if (schemaChangeEvent instanceof CreateTableEvent) {
+            CreateTableEvent createTableEvent = (CreateTableEvent) 
schemaChangeEvent;
+            applyCreateTable(createTableEvent);
+        } else if (schemaChangeEvent instanceof DropTableEvent) {
+            DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
+            applyDropTable(dropTableEvent);
+        } else {
+            throw new IllegalArgumentException(
+                    "fluss metadata applier only support CreateTableEvent now 
but receives "
+                            + schemaChangeEvent);
+        }
+    }
+
+    private void applyCreateTable(CreateTableEvent event) {
+        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
+                Admin admin = connection.getAdmin()) {
+            TableId tableId = event.tableId();
+            TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
+            String tableIdentifier = tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
+            List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
+            Integer bucketNum = bucketNumMap.get(tableIdentifier);
+            TableDescriptor inferredFlussTable =
+                    toFlussTable(event.getSchema(), bucketKeys, bucketNum, 
tableProperties);
+            admin.createDatabase(tablePath.getDatabaseName(), 
DatabaseDescriptor.EMPTY, true);
+            if (!admin.tableExists(tablePath).get()) {
+                admin.createTable(tablePath, inferredFlussTable, false).get();
+            } else {
+                TableInfo currentTableInfo = 
admin.getTableInfo(tablePath).get();
+                // sanity check to prevent unexpected table schema evolution.
+                sanityCheck(inferredFlussTable, currentTableInfo);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to apply schema change {}", event, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void applyDropTable(DropTableEvent event) {
+        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
+                Admin admin = connection.getAdmin()) {
+            TableId tableId = event.tableId();
+            TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
+            admin.dropTable(tablePath, true).get();
+        } catch (Exception e) {
+            LOG.error("Failed to apply schema change {}", event, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo 
currentTableInfo) {
+        List<String> inferredPrimaryKeyColumnNames =
+                
inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()
+                        .sorted()
+                        .collect(Collectors.toList());
+        List<String> currentPrimaryKeyColumnNames =
+                
currentTableInfo.getSchema().getPrimaryKeyColumnNames().stream()
+                        .sorted()
+                        .collect(Collectors.toList());
+        if 
(!inferredPrimaryKeyColumnNames.equals(currentPrimaryKeyColumnNames)) {
+            throw new ValidationException(
+                    "The CDC create table event schema is not matched to 
current Fluss table schema. "
+                            + "\n New Fluss table's primary keys : "
+                            + inferredPrimaryKeyColumnNames
+                            + "\n Current Fluss's primary keys: "
+                            + currentPrimaryKeyColumnNames);
+        }
+
+        List<String> inferredBucketKeys = inferredFlussTable.getBucketKeys();
+        List<String> currentBucketKeys = currentTableInfo.getBucketKeys();
+        if (!inferredBucketKeys.equals(currentBucketKeys)) {
+            throw new ValidationException(
+                    "The CDC create table event schema is not matched to 
current Fluss table schema. "

Review Comment:
   user would mislead by current exception message, what's CDC? how about `The 
table schema inffered by Flink CDC is not matched with current Fluss table 
schema.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to