yuxiqian commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1549225693
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */ +public class PaimonWriter + implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Event, MultiTableCommittable> { + + // use `static` because Catalog is unSerializable. + private static Catalog catalog; + private final IOManager ioManager; + + // maintain the latest schema of tableId + private final Map<TableId, TableSchemaInfo> schemaMaps; + // Each job can only have one user name and this name must be consistent across restarts. + private final String commitUser; + // all table write should share one write buffer so that writers can preempt memory + // from those of other tables + private MemoryPoolFactory memoryPoolFactory; + private final Map<Identifier, FileStoreTable> tables; + private final Map<Identifier, StoreSinkWrite> writes; + private final ExecutorService compactExecutor; + private final MetricGroup metricGroup; + private final ZoneId zoneId; + + private final List<MultiTableCommittable> committables; + + public PaimonWriter( + Options catalogOptions, MetricGroup metricGroup, ZoneId zoneId, String commitUser) { + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + this.metricGroup = metricGroup; + this.commitUser = commitUser; + this.tables = new HashMap<>(); + this.writes = new HashMap<>(); + this.schemaMaps = new HashMap<>(); + this.committables = new ArrayList<>(); + this.ioManager = new IOManagerAsync(); + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); + this.zoneId = zoneId; + } + + @Override + public Collection<MultiTableCommittable> prepareCommit() throws IOException { Review Comment: ```suggestion public Collection<MultiTableCommittable> prepareCommit() { ``` `IOException` never throws here ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.utils.Preconditions; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */ +public class PaimonDataSinkFactory implements DataSinkFactory { + + public static final String IDENTIFIER = "paimon"; + + @Override + public DataSink createDataSink(Context context) { + Map<String, String> allOptions = context.getFactoryConfiguration().toMap(); + Map<String, String> catalogOptions = new HashMap<>(); + Map<String, String> tableOptions = new HashMap<>(); + allOptions.forEach( + (key, value) -> { + if (key.startsWith(PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES)) { + tableOptions.put( + key.substring( + PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES.length()), + value); + } else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { + catalogOptions.put( + key.substring( + PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()), + value); + } + }); + Options options = Options.fromMap(catalogOptions); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options); + Preconditions.checkNotNull(catalog.listDatabases(), "catalog option of Paimon is invalid."); + ZoneId zoneId = ZoneId.systemDefault(); + if (!Objects.equals( + context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), + PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue())) { + zoneId = + ZoneId.of( + context.getPipelineConfiguration() + .get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); + } + String commitUser = + context.getFactoryConfiguration().get(PaimonDataSinkOptions.COMMIT_USER); + String partitionKey = + context.getFactoryConfiguration().get(PaimonDataSinkOptions.PARTITION_KEY); + Map<TableId, List<String>> partitionMaps = new HashMap<>(); + for (String tables : partitionKey.split(";")) { + String[] splits = tables.split(":"); + if (splits.length == 2) { + TableId tableId = TableId.parse(splits[0]); + List<String> partitions = Arrays.asList(splits[1].split(",")); + partitionMaps.put(tableId, partitions); + } Review Comment: Should we throw an exception / print a warning information, instead of ignoring it silently when a partition key format isn't valid (`splits.length != 2`)? ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java: ########## @@ -95,54 +95,102 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ int scale = DataTypes.getScale(type).orElse(0); switch (type.getTypeRoot()) { case CHAR: - return org.apache.flink.table.api.DataTypes.CHAR(length); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.CHAR(length) + : org.apache.flink.table.api.DataTypes.CHAR(length).notNull(); case VARCHAR: - return org.apache.flink.table.api.DataTypes.VARCHAR(length); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.VARCHAR(length) + : org.apache.flink.table.api.DataTypes.VARCHAR(length).notNull(); case BOOLEAN: - return org.apache.flink.table.api.DataTypes.BOOLEAN(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.BOOLEAN() + : org.apache.flink.table.api.DataTypes.BOOLEAN().notNull(); case BINARY: - return org.apache.flink.table.api.DataTypes.BINARY(length); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.BINARY(length) + : org.apache.flink.table.api.DataTypes.BINARY(length).notNull(); case VARBINARY: - return org.apache.flink.table.api.DataTypes.VARBINARY(length); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.VARBINARY(length) + : org.apache.flink.table.api.DataTypes.VARBINARY(length).notNull(); case DECIMAL: - return org.apache.flink.table.api.DataTypes.DECIMAL(precision, scale); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.DECIMAL(precision, scale) + : org.apache.flink.table.api.DataTypes.DECIMAL(precision, scale).notNull(); case TINYINT: - return org.apache.flink.table.api.DataTypes.TINYINT(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.TINYINT() + : org.apache.flink.table.api.DataTypes.TINYINT().notNull(); case SMALLINT: - return org.apache.flink.table.api.DataTypes.SMALLINT(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.SMALLINT() + : org.apache.flink.table.api.DataTypes.SMALLINT().notNull(); case INTEGER: - return org.apache.flink.table.api.DataTypes.INT(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.INT() + : org.apache.flink.table.api.DataTypes.INT().notNull(); case DATE: - return org.apache.flink.table.api.DataTypes.DATE(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.DATE() + : org.apache.flink.table.api.DataTypes.DATE().notNull(); case TIME_WITHOUT_TIME_ZONE: - return org.apache.flink.table.api.DataTypes.TIME(precision); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.TIME(precision) + : org.apache.flink.table.api.DataTypes.TIME(precision).notNull(); case BIGINT: - return org.apache.flink.table.api.DataTypes.BIGINT(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.BIGINT() + : org.apache.flink.table.api.DataTypes.BIGINT().notNull(); case FLOAT: - return org.apache.flink.table.api.DataTypes.FLOAT(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.FLOAT() + : org.apache.flink.table.api.DataTypes.FLOAT().notNull(); case DOUBLE: - return org.apache.flink.table.api.DataTypes.DOUBLE(); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.DOUBLE() + : org.apache.flink.table.api.DataTypes.DOUBLE().notNull(); case TIMESTAMP_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_TIME_ZONE: - return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(precision); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.TIMESTAMP(precision) + : org.apache.flink.table.api.DataTypes.TIMESTAMP(precision).notNull(); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE( - precision); + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE( + precision) + : org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE( + precision) + .notNull(); + case TIMESTAMP_WITH_TIME_ZONE: + return type.isNullable() + ? org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(precision) + : org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(precision) + .notNull(); case ARRAY: - Preconditions.checkState(children != null && !children.isEmpty()); - return org.apache.flink.table.api.DataTypes.ARRAY(toFlinkDataType(children.get(0))); + Preconditions.checkState(children != null && children.size() > 0); Review Comment: Why changing `children.isEmpty()` check to `children.size() > 0` since the previous one is clearer? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.utils.Preconditions; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */ +public class PaimonDataSinkFactory implements DataSinkFactory { + + public static final String IDENTIFIER = "paimon"; + + @Override + public DataSink createDataSink(Context context) { + Map<String, String> allOptions = context.getFactoryConfiguration().toMap(); + Map<String, String> catalogOptions = new HashMap<>(); + Map<String, String> tableOptions = new HashMap<>(); + allOptions.forEach( + (key, value) -> { + if (key.startsWith(PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES)) { + tableOptions.put( + key.substring( + PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES.length()), + value); + } else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { + catalogOptions.put( + key.substring( + PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()), + value); + } + }); + Options options = Options.fromMap(catalogOptions); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options); + Preconditions.checkNotNull(catalog.listDatabases(), "catalog option of Paimon is invalid."); Review Comment: ```suggestion Preconditions.checkNotNull(catalog.listDatabases(), "catalog option of Paimon is invalid."); } catch (Exception ignored) { } ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaChange; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table + * only. + */ +public class PaimonMetadataApplier implements MetadataApplier { + + // Catalog is unSerializable. + private transient Catalog catalog; + + // currently, we set table options for all tables using the same options. + private final Map<String, String> tableOptions; + + private final Options catalogOptions; + + private final Map<TableId, List<String>> partitionMaps; + + public PaimonMetadataApplier(Options catalogOptions) { + this.catalogOptions = catalogOptions; + this.tableOptions = new HashMap<>(); + this.partitionMaps = new HashMap<>(); + } + + public PaimonMetadataApplier( + Options catalogOptions, + Map<String, String> tableOptions, + Map<TableId, List<String>> partitionMaps) { + this.catalogOptions = catalogOptions; + this.tableOptions = tableOptions; + this.partitionMaps = partitionMaps; + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { + if (catalog == null) { + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + } + try { + if (schemaChangeEvent instanceof CreateTableEvent) { + applyCreateTable((CreateTableEvent) schemaChangeEvent); + } else if (schemaChangeEvent instanceof AddColumnEvent) { + applyAddColumn((AddColumnEvent) schemaChangeEvent); + } else if (schemaChangeEvent instanceof DropColumnEvent) { + applyDropColumn((DropColumnEvent) schemaChangeEvent); + } else if (schemaChangeEvent instanceof RenameColumnEvent) { + applyRenameColumn((RenameColumnEvent) schemaChangeEvent); + } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { + applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent); + } else { + throw new UnsupportedOperationException( + "PaimonDataSink doesn't support schema change event " + schemaChangeEvent); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** TODO support partition column. */ Review Comment: Is partition column support still in TODO stage? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.api.connector.sink2.Committer; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.StoreMultiCommitter; +import org.apache.paimon.manifest.WrappedManifestCommittable; +import org.apache.paimon.options.Options; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** A {@link Committer} to commit write results for multiple tables. */ +public class PaimonCommitter implements Committer<MultiTableCommittable> { + + private final StoreMultiCommitter storeMultiCommitter; + + public PaimonCommitter(Options catalogOptions, String commitUser) { + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + // flinkMetricGroup could be passed after FLIP-371. + storeMultiCommitter = new StoreMultiCommitter(() -> catalog, commitUser, null); + } + + @Override + public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests) + throws IOException, InterruptedException { + if (commitRequests.isEmpty()) { + return; + } + List<MultiTableCommittable> committables = + commitRequests.stream() + .map(CommitRequest::getCommittable) + .collect(Collectors.toList()); + long checkpointId = committables.get(0).checkpointId(); + WrappedManifestCommittable wrappedManifestCommittable = + storeMultiCommitter.combine(checkpointId, 1L, committables); + storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable)); Review Comment: CMIIW, will all commit requests in one single batch share the same checkpoint id? Seems this code just picks from the first `Committable` and pack the rest into one single `WrappedManifestCommittable` with a shared Checkpoint ID. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */ +public class PaimonWriterHelper { + + /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ + public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) { + List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size()); + for (int i = 0; i < schema.getColumns().size(); i++) { + fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, zoneId)); Review Comment: `getColumns()` was called every single time during the iterating process. Consider replacing it with `forEach` or at least cache the columns list? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Schema; + +import java.time.ZoneId; +import java.util.List; + +/** Keep a list of {@link RecordData.FieldGetter} for a specific {@link Schema}. */ +public class TableSchemaInfo { Review Comment: This function keeps a bundle of `FieldGetter`. `PaimonWriter` recreates all getters every time when a `SchemaChangeEvent` occurs. Maybe we can provide an approach to update `TableSchemaInfo` partially? Though it might not improve the performance greatly, it might make such class more useful and flexible. In `PaimonWriter` we might write something like that: ```java someTableSchemaInfo.apply(schemaChangeEvent) ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.apache.paimon.flink.sink.MultiTableCommittable; + +import java.util.ArrayList; +import java.util.List; + +/** An Operator to add checkpointId to MultiTableCommittable and generate CommittableSummary. */ +public class PreCommitOperator + extends AbstractStreamOperator<CommittableMessage<MultiTableCommittable>> + implements OneInputStreamOperator< + CommittableMessage<MultiTableCommittable>, + CommittableMessage<MultiTableCommittable>> { + + /** store a list of MultiTableCommittable in one checkpoint. */ + private final List<MultiTableCommittable> results; + + public PreCommitOperator() { + results = new ArrayList<>(); + } + + @Override + public void open() throws Exception { + super.open(); + } + + @Override + public void processElement(StreamRecord<CommittableMessage<MultiTableCommittable>> element) { + if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { Review Comment: Is this `element.isRecord()` check necessary? The function signature has limited `element` to be a `StreamRecord`, so `isRecord` check should never be `false`. ########## docs/content/pipelines/paimon-pipeline(ZH).md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline +---------------- + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` + +Pipeline 连接器配置项 +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>指定要使用的连接器, 这里需要设置成 <code>'paimon'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Sink 的名称.</td> + </tr> + <tr> + <td>catalog.properties.metastore</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>用于构建 Paimon Catalog 的类型。可选填值 filesystem 或者 hive。</td> + </tr> + <tr> + <td>catalog.properties.warehouse</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Paimon 仓库存储数据的根目录。</td> + </tr> + <tr> + <td>catalog.properties.uri</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Hive metastore 的 uri,在 metastore 设置为 hive 的时候需要。</td> + </tr> + <tr> + <td>commit.user</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>提交数据文件时的用户名, 默认值为 `admin`.</td> + </tr> + <tr> + <td>partition.key</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>设置每个分区表的分区字段,允许填写成多个分区表的多个分区字段。 不同的表使用 ';'分割, 而不同的字段则使用 ','分割。举个例子, 我们可以为两张表的不同分区字段作如下的设置 'testdb.table1:id1,id2;testdb.table2:name'。</td> + </tr> + <tr> + <td>catalog.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>将 Paimon catalog 支持的参数传递给 pipeline,参考 <a href="https://paimon.apache.org/docs/master/maintenance/configurations/#catalogoptions">Paimon catalog options</a>。 </td> + </tr> + <tr> + <td>table.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>将 Paimon table 支持的参数传递给 pipeline,参考 <a href="https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions">Paimon table options</a>。 </td> + </tr> + </tbody> +</table> +</div> + +使用说明 +-------- + +* 只支持主键表,因此源表必须有主键 + +* 暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写 + +数据类型映射 +---------------- +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">CDC type</th> + <th class="text-left">Paimon type</th> + <th class="text-left" style="width:60%;">NOTE</th> + </tr> + </thead> + <tbody> + <tr> + <td>TINYINT</td> + <td>TINYINT</td> + <td></td> + </tr> + <tr> + <td>SMALLINT</td> + <td>SMALLINT</td> + <td></td> + </tr> + <tr> + <td>INT</td> + <td>INT</td> + <td></td> + </tr> + <tr> + <td>BIGINT</td> + <td>BIGINT</td> + <td></td> + </tr> + <tr> + <td>FLOAT</td> + <td>FLOAT</td> + <td></td> + </tr> + <tr> + <td>DOUBLE</td> + <td>DOUBLE</td> + <td></td> + </tr> + <tr> + <td>DECIMAL(p, s)</td> + <td>DECIMAL(p, s)</td> + <td></td> + </tr> + <tr> + <td>BOOLEAN</td> + <td>BOOLEAN</td> + <td></td> + </tr> + <tr> + <td>DATE</td> + <td>DATE</td> + <td></td> + </tr> + <tr> + <td>TIMESTAMP</td> + <td>DATETIME</td> + <td></td> + </tr> + <tr> + <td>TIMESTAMP_LTZ</td> + <td>TIMESTAMP_LTZ</td> + <td></td> + </tr> + <tr> + <td>CHAR(n)</td> + <td>CHAR(n)</td> + <td></td> + </tr> + <tr> + <td>VARCHAR(n)</td> + <td>VARCHAR(n)</td> + <td></td> + </tr> + </tbody> +</table> +</div> + +FAQ +-------- +* [FAQ(English)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ) +* [FAQ(中文)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)) Review Comment: Stale hyperlinks. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; + +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** A test for {@link MultiTableCommittableChannelComputer}. */ +public class TestMultiTableCommittableChannelComputer { + + @Test + public void testChannel() { + MultiTableCommittableChannelComputer computer = new MultiTableCommittableChannelComputer(); + computer.setup(4); + List<MultiTableCommittable> commits = + Arrays.asList( + new MultiTableCommittable("database", "table1", 1L, null, null), + new MultiTableCommittable("database", "table2", 1L, null, null), + new MultiTableCommittable("database", "table1", 1L, null, null), + new MultiTableCommittable("database", "table5", 1L, null, null), + new MultiTableCommittable("database", "table3", 1L, null, null), + new MultiTableCommittable("database", "table8", 1L, null, null), + new MultiTableCommittable("database", "table5", 1L, null, null), + new MultiTableCommittable("database", "table1", 1L, null, null), + new MultiTableCommittable("database", "table9", 1L, null, null), + new MultiTableCommittable("database", "table5", 1L, null, null), + new MultiTableCommittable("database", "table3", 1L, null, null), + new MultiTableCommittable("database", "table8", 1L, null, null)); + Map<Integer, Set<String>> map = new HashMap<>(); + commits.forEach( + (commit) -> { + int channel = computer.channel(new CommittableWithLineage<>(commit, 1L, 0)); + Set<String> set = map.getOrDefault(channel, new HashSet<>()); + set.add(commit.getTable()); + map.put(channel, set); + }); + int count = 0; + for (Map.Entry<Integer, Set<String>> entry : map.entrySet()) { + count += entry.getValue().size(); + } + // Not a table is appeared in more than one channel. + Assertions.assertEquals(6, count); Review Comment: Only `MultiTableCommitable`s with lineage were tested. Maybe also test those with plain committable messages? ########## docs/content/pipelines/paimon-pipeline.md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline Connector + +The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Paimon](https://paimon.apache.org). This document describes how to set up the Paimon Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization + +How to create Pipeline +---------------- + +The pipeline for reading data from MySQL and sink to Paimon can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` + +Pipeline Connector Options +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use, here should be <code>'paimon'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the sink.</td> + </tr> + <tr> + <td>catalog.properties.metastore</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Metastore of paimon catalog, supports filesystem and hive.</td> + </tr> + <tr> + <td>catalog.properties.warehouse</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The warehouse root path of catalog.</td> + </tr> + <tr> + <td>catalog.properties.uri</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Uri of metastore server.</td> + </tr> + <tr> + <td>commit.user</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>User name for committing data files, default value is `admin`.</td> + </tr> Review Comment: ```suggestion <tr> <td>commit.user</td> <td>optional</td> <td style="word-wrap: break-word;"><code>"admin"</code></td> <td>String</td> <td>User name for committing data files.</td> </tr> ``` ########## docs/content/pipelines/paimon-pipeline.md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline Connector + +The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Paimon](https://paimon.apache.org). This document describes how to set up the Paimon Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization + +How to create Pipeline +---------------- + +The pipeline for reading data from MySQL and sink to Paimon can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` + +Pipeline Connector Options +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use, here should be <code>'paimon'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the sink.</td> + </tr> + <tr> + <td>catalog.properties.metastore</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Metastore of paimon catalog, supports filesystem and hive.</td> + </tr> + <tr> + <td>catalog.properties.warehouse</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The warehouse root path of catalog.</td> + </tr> + <tr> + <td>catalog.properties.uri</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Uri of metastore server.</td> + </tr> + <tr> + <td>commit.user</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>User name for committing data files, default value is `admin`.</td> + </tr> + <tr> + <td>partition.key</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. Each table are separated by ';', and each partition key are separated by ','. For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.</td> + </tr> + <tr> + <td>catalog.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Pass options of Paimon catalog to pipeline,See <a href="https://paimon.apache.org/docs/master/maintenance/configurations/#catalogoptions">Paimon catalog options</a>。 </td> + </tr> + <tr> + <td>table.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Pass options of Paimon table to pipeline,See <a href="https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions">Paimon table options</a>。 </td> + </tr> + </tbody> +</table> +</div> + +Usage Notes +-------- + +* Only support Paimon primary key table, so the source table must have primary keys. + +* Not support exactly-once. The connector uses at-least-once + primary key table for idempotent writing. + +Data Type Mapping +---------------- +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">CDC type</th> + <th class="text-left">Paimon type</th> + <th class="text-left" style="width:60%;">NOTE</th> + </tr> + </thead> + <tbody> + <tr> + <td>TINYINT</td> + <td>TINYINT</td> + <td></td> + </tr> + <tr> + <td>SMALLINT</td> + <td>SMALLINT</td> + <td></td> + </tr> + <tr> + <td>INT</td> + <td>INT</td> + <td></td> + </tr> + <tr> + <td>BIGINT</td> + <td>BIGINT</td> + <td></td> + </tr> + <tr> + <td>FLOAT</td> + <td>FLOAT</td> + <td></td> + </tr> + <tr> + <td>DOUBLE</td> + <td>DOUBLE</td> + <td></td> + </tr> + <tr> + <td>DECIMAL(p, s)</td> + <td>DECIMAL(p, s)</td> + <td></td> + </tr> + <tr> + <td>BOOLEAN</td> + <td>BOOLEAN</td> + <td></td> + </tr> + <tr> + <td>DATE</td> + <td>DATE</td> + <td></td> + </tr> + <tr> + <td>TIMESTAMP</td> + <td>TIMESTAMP</td> + <td></td> + </tr> + <tr> + <td>TIMESTAMP_LTZ</td> + <td>TIMESTAMP_LTZ</td> + <td></td> + </tr> + <tr> + <td>CHAR(n)</td> + <td>CHAR(n)</td> + <td></td> + </tr> + <tr> + <td>VARCHAR(n)</td> + <td>VARCHAR(n)</td> + <td></td> + </tr> + </tbody> +</table> +</div> + +FAQ +-------- +* [FAQ(English)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ) +* [FAQ(中文)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)) Review Comment: Stale hyperlinks. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */ +public class PaimonWriter + implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Event, MultiTableCommittable> { + + // use `static` because Catalog is unSerializable. + private static Catalog catalog; + private final IOManager ioManager; + + // maintain the latest schema of tableId + private final Map<TableId, TableSchemaInfo> schemaMaps; + // Each job can only have one user name and this name must be consistent across restarts. + private final String commitUser; + // all table write should share one write buffer so that writers can preempt memory + // from those of other tables + private MemoryPoolFactory memoryPoolFactory; + private final Map<Identifier, FileStoreTable> tables; + private final Map<Identifier, StoreSinkWrite> writes; + private final ExecutorService compactExecutor; + private final MetricGroup metricGroup; + private final ZoneId zoneId; + + private final List<MultiTableCommittable> committables; + + public PaimonWriter( + Options catalogOptions, MetricGroup metricGroup, ZoneId zoneId, String commitUser) { + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + this.metricGroup = metricGroup; + this.commitUser = commitUser; + this.tables = new HashMap<>(); + this.writes = new HashMap<>(); + this.schemaMaps = new HashMap<>(); + this.committables = new ArrayList<>(); + this.ioManager = new IOManagerAsync(); + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); + this.zoneId = zoneId; + } + + @Override + public Collection<MultiTableCommittable> prepareCommit() throws IOException { + Collection<MultiTableCommittable> allCommittables = new ArrayList<>(committables); + committables.clear(); + return allCommittables; + } + + @Override + public void write(Event event, Context context) throws IOException { + if (event instanceof SchemaChangeEvent) { + if (event instanceof CreateTableEvent) { + CreateTableEvent createTableEvent = (CreateTableEvent) event; + schemaMaps.put( + createTableEvent.tableId(), + new TableSchemaInfo(createTableEvent.getSchema(), zoneId)); + } else { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + schemaMaps.put( + schemaChangeEvent.tableId(), + new TableSchemaInfo( + SchemaUtils.applySchemaChangeEvent( + schemaMaps.get(schemaChangeEvent.tableId()).getSchema(), + schemaChangeEvent), + zoneId)); + + Identifier tableId = + Identifier.create( + schemaChangeEvent.tableId().getSchemaName(), + schemaChangeEvent.tableId().getTableName()); + // remove the table temporarily, then add the table with latest schema when received + // DataChangeEvent. + writes.remove(tableId); + tables.remove(tableId); + } + } else if (event instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + Identifier tableId = + Identifier.create( + dataChangeEvent.tableId().getSchemaName(), + dataChangeEvent.tableId().getTableName()); + + FileStoreTable table; + table = getTable(tableId); + + if (memoryPoolFactory == null) { + memoryPoolFactory = + new MemoryPoolFactory( + // currently, the options of all tables are the same in CDC + new HeapMemorySegmentPool( + table.coreOptions().writeBufferSize(), + table.coreOptions().pageSize())); + } + + StoreSinkWrite write = + writes.computeIfAbsent( + tableId, + id -> { + StoreSinkWriteImpl storeSinkWrite = + new StoreSinkWriteImpl( + table, + commitUser, + ioManager, + false, + false, + true, + memoryPoolFactory, + metricGroup); + storeSinkWrite.withCompactExecutor(compactExecutor); + return storeSinkWrite; + }); + + GenericRow genericRow = + PaimonWriterHelper.convertEventToGenericRow( + dataChangeEvent, + schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); + try { + write.write(genericRow); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + private FileStoreTable getTable(Identifier tableId) { + FileStoreTable table = tables.get(tableId); + if (table == null) { + try { + table = (FileStoreTable) catalog.getTable(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + tables.put(tableId, table); + } Review Comment: ```suggestion FileStoreTable table = tables.computeIfAbsent(tableId, id -> { try { return (FileStoreTable) catalog.getTable(tableId); } catch (Exception e) { throw new RuntimeException(e); } }); ``` This logic seems identical to `computeIfAbsent`. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.api.connector.sink2.Committer; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.StoreMultiCommitter; +import org.apache.paimon.manifest.WrappedManifestCommittable; +import org.apache.paimon.options.Options; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** A {@link Committer} to commit write results for multiple tables. */ +public class PaimonCommitter implements Committer<MultiTableCommittable> { + + private final StoreMultiCommitter storeMultiCommitter; + + public PaimonCommitter(Options catalogOptions, String commitUser) { + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + // flinkMetricGroup could be passed after FLIP-371. + storeMultiCommitter = new StoreMultiCommitter(() -> catalog, commitUser, null); Review Comment: ```suggestion // flinkMetricGroup could be passed after FLIP-371. storeMultiCommitter = new StoreMultiCommitter(() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), commitUser, null); ``` What about moving catalog initialisation into the closure? Seems catalog lifecycle is fully managed by `StoreMultiCommitter`, and that omits the temp variable & IDE warning. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; + +import org.apache.paimon.options.CatalogOptions; + +import static org.apache.flink.cdc.common.configuration.ConfigOptions.key; + +/** copy from {@link CatalogOptions}. Options for {@link PaimonDataSink}. */ +public class PaimonDataSinkOptions { + + // prefix for passing properties for table creation. + public static final String PREFIX_TABLE_PROPERTIES = "table.properties."; + + // prefix for passing properties for catalog creation. + public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties."; + + public static final ConfigOption<String> COMMIT_USER = + key("commit.user") + .stringType() + .defaultValue("admin") + .withDescription("User name for committing data files."); + + public static final ConfigOption<String> WAREHOUSE = + key("catalog.properties.warehouse") + .stringType() + .noDefaultValue() + .withDescription("The warehouse root path of catalog."); + + public static final ConfigOption<String> METASTORE = + key("catalog.properties.metastore") + .stringType() + .defaultValue("filesystem") + .withDescription("Metastore of paimon catalog, supports filesystem and hive."); + + public static final ConfigOption<String> URI = + key("catalog.properties.uri") + .stringType() + .noDefaultValue() + .withDescription("Uri of metastore server."); + + public static final ConfigOption<String> PARTITION_KEY = + key("partition.key") + .stringType() + .defaultValue("") + .withDescription( + "Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. " + + "Each table are separated by ';', and each partition key are separated by ','. " Review Comment: ```suggestion + "Tables are separated by ';', and partition keys are separated by ','. " ``` Minor grammar fix ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.utils.Preconditions; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */ +public class PaimonDataSinkFactory implements DataSinkFactory { + + public static final String IDENTIFIER = "paimon"; + + @Override + public DataSink createDataSink(Context context) { + Map<String, String> allOptions = context.getFactoryConfiguration().toMap(); + Map<String, String> catalogOptions = new HashMap<>(); + Map<String, String> tableOptions = new HashMap<>(); + allOptions.forEach( + (key, value) -> { + if (key.startsWith(PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES)) { + tableOptions.put( + key.substring( + PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES.length()), + value); + } else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { + catalogOptions.put( + key.substring( + PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()), + value); + } + }); + Options options = Options.fromMap(catalogOptions); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options); Review Comment: `catalog` is never used after here. Maybe `close()` it to free resources? ```suggestion try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) { ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */ +public class PaimonWriter + implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Event, MultiTableCommittable> { + + // use `static` because Catalog is unSerializable. + private static Catalog catalog; + private final IOManager ioManager; + + // maintain the latest schema of tableId + private final Map<TableId, TableSchemaInfo> schemaMaps; + // Each job can only have one user name and this name must be consistent across restarts. + private final String commitUser; + // all table write should share one write buffer so that writers can preempt memory + // from those of other tables + private MemoryPoolFactory memoryPoolFactory; + private final Map<Identifier, FileStoreTable> tables; + private final Map<Identifier, StoreSinkWrite> writes; + private final ExecutorService compactExecutor; + private final MetricGroup metricGroup; + private final ZoneId zoneId; + + private final List<MultiTableCommittable> committables; + + public PaimonWriter( + Options catalogOptions, MetricGroup metricGroup, ZoneId zoneId, String commitUser) { + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + this.metricGroup = metricGroup; + this.commitUser = commitUser; + this.tables = new HashMap<>(); + this.writes = new HashMap<>(); + this.schemaMaps = new HashMap<>(); + this.committables = new ArrayList<>(); + this.ioManager = new IOManagerAsync(); + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); + this.zoneId = zoneId; + } + + @Override + public Collection<MultiTableCommittable> prepareCommit() throws IOException { + Collection<MultiTableCommittable> allCommittables = new ArrayList<>(committables); + committables.clear(); + return allCommittables; + } + + @Override + public void write(Event event, Context context) throws IOException { + if (event instanceof SchemaChangeEvent) { + if (event instanceof CreateTableEvent) { + CreateTableEvent createTableEvent = (CreateTableEvent) event; + schemaMaps.put( + createTableEvent.tableId(), + new TableSchemaInfo(createTableEvent.getSchema(), zoneId)); + } else { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + schemaMaps.put( + schemaChangeEvent.tableId(), + new TableSchemaInfo( + SchemaUtils.applySchemaChangeEvent( + schemaMaps.get(schemaChangeEvent.tableId()).getSchema(), + schemaChangeEvent), + zoneId)); + + Identifier tableId = + Identifier.create( + schemaChangeEvent.tableId().getSchemaName(), + schemaChangeEvent.tableId().getTableName()); + // remove the table temporarily, then add the table with latest schema when received + // DataChangeEvent. + writes.remove(tableId); + tables.remove(tableId); + } + } else if (event instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + Identifier tableId = + Identifier.create( + dataChangeEvent.tableId().getSchemaName(), + dataChangeEvent.tableId().getTableName()); + + FileStoreTable table; + table = getTable(tableId); + + if (memoryPoolFactory == null) { + memoryPoolFactory = + new MemoryPoolFactory( + // currently, the options of all tables are the same in CDC + new HeapMemorySegmentPool( + table.coreOptions().writeBufferSize(), + table.coreOptions().pageSize())); + } + + StoreSinkWrite write = + writes.computeIfAbsent( + tableId, + id -> { + StoreSinkWriteImpl storeSinkWrite = + new StoreSinkWriteImpl( + table, + commitUser, + ioManager, + false, + false, + true, + memoryPoolFactory, + metricGroup); + storeSinkWrite.withCompactExecutor(compactExecutor); + return storeSinkWrite; + }); + + GenericRow genericRow = + PaimonWriterHelper.convertEventToGenericRow( + dataChangeEvent, + schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); + try { + write.write(genericRow); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + private FileStoreTable getTable(Identifier tableId) { + FileStoreTable table = tables.get(tableId); + if (table == null) { + try { + table = (FileStoreTable) catalog.getTable(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + tables.put(tableId, table); + } + + if (table.bucketMode() != BucketMode.FIXED) { + throw new UnsupportedOperationException( + "Unified Sink only supports FIXED bucket mode, but is " + table.bucketMode()); + } + return table; + } + + /** + * Called on checkpoint or end of input so that the writer to flush all pending data for + * at-least-once. + * + * <p>this method will also be called when receiving {@link FlushEvent}, but we don't need to + * commit the MultiTableCommittables immediately in this case, because {@link PaimonCommitter} + * support committing datas of different schemas. Review Comment: ```suggestion * support committing data of different schemas. ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */ +public class PaimonWriterHelper { + + /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ + public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) { + List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size()); + for (int i = 0; i < schema.getColumns().size(); i++) { + fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, zoneId)); + } + return fieldGetters; + } + + private static RecordData.FieldGetter createFieldGetter( + DataType fieldType, int fieldPos, ZoneId zoneId) { + final RecordData.FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> { + DecimalData decimalData = + row.getDecimal(fieldPos, decimalPrecision, decimalScale); + return Decimal.fromBigDecimal( + decimalData.toBigDecimal(), decimalPrecision, decimalScale); + }; + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> row.getInt(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromSQLTimestamp( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toTimestamp()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromLocalDateTime( + ZonedDateTime.ofInstant( + row.getLocalZonedTimestampData( + fieldPos, + DataTypeChecks.getPrecision( + fieldType)) + .toInstant(), + zoneId) + .toLocalDateTime()); + break; Review Comment: Is `ARRAY`, `MAP` and `ROW` omitted intentionally here? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */ +public class PaimonWriterHelper { + + /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ + public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) { + List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size()); + for (int i = 0; i < schema.getColumns().size(); i++) { + fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, zoneId)); + } + return fieldGetters; + } + + private static RecordData.FieldGetter createFieldGetter( + DataType fieldType, int fieldPos, ZoneId zoneId) { + final RecordData.FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> { + DecimalData decimalData = + row.getDecimal(fieldPos, decimalPrecision, decimalScale); + return Decimal.fromBigDecimal( + decimalData.toBigDecimal(), decimalPrecision, decimalScale); + }; + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> row.getInt(fieldPos); + break; Review Comment: In root type definition, `DATE` and `TIME_WITHOUT_TIME_ZONE` type seems located after numeric types. Should we keep the ordering here, too? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.UUID; + +/** Tests for {@link PaimonDataSinkFactory}. */ +public class PaimonDataSinkFactoryTest { + + @TempDir public static java.nio.file.Path temporaryFolder; + + @Test + public void testCreateDataSink() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); + Assertions.assertTrue(sinkFactory instanceof PaimonDataSinkFactory); Review Comment: ```suggestion Assertions.assertInstanceOf(PaimonDataSinkFactory.class, sinkFactory); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.UUID; + +/** Tests for {@link PaimonDataSinkFactory}. */ +public class PaimonDataSinkFactoryTest { + + @TempDir public static java.nio.file.Path temporaryFolder; + + @Test + public void testCreateDataSink() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); + Assertions.assertTrue(sinkFactory instanceof PaimonDataSinkFactory); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.<String, String>builder() + .put(PaimonDataSinkOptions.METASTORE.key(), "filesystem") + .put( + PaimonDataSinkOptions.WAREHOUSE.key(), + new File( + temporaryFolder.toFile(), + UUID.randomUUID().toString()) + .toString()) + .build()); + DataSink dataSink = + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, conf, Thread.currentThread().getContextClassLoader())); + Assertions.assertTrue(dataSink instanceof PaimonDataSink); Review Comment: ```suggestion Assertions.assertInstanceOf(PaimonDataSink.class, dataSink); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */ +public class PaimonWriterHelper { + + /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ + public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) { + List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size()); + for (int i = 0; i < schema.getColumns().size(); i++) { + fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, zoneId)); + } + return fieldGetters; + } + + private static RecordData.FieldGetter createFieldGetter( + DataType fieldType, int fieldPos, ZoneId zoneId) { + final RecordData.FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> { + DecimalData decimalData = + row.getDecimal(fieldPos, decimalPrecision, decimalScale); + return Decimal.fromBigDecimal( + decimalData.toBigDecimal(), decimalPrecision, decimalScale); + }; + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> row.getInt(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromSQLTimestamp( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toTimestamp()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromLocalDateTime( + ZonedDateTime.ofInstant( + row.getLocalZonedTimestampData( + fieldPos, + DataTypeChecks.getPrecision( + fieldType)) + .toInstant(), + zoneId) + .toLocalDateTime()); + break; + default: + throw new IllegalArgumentException( + "don't support type of " + fieldType.getTypeRoot()); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); Review Comment: Minor concern about the interface semantic: I wonder if `FieldGetter::getFieldOrNull` is handling the null field internally (by performing the `row.isNullAt` check)? Because now it's the external caller checking if the `row` is null, which doesn't quite match its name (get field or return null). ########## docs/content/pipelines/paimon-pipeline(ZH).md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline +---------------- + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` + +Pipeline 连接器配置项 +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>指定要使用的连接器, 这里需要设置成 <code>'paimon'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Sink 的名称.</td> + </tr> + <tr> + <td>catalog.properties.metastore</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>用于构建 Paimon Catalog 的类型。可选填值 filesystem 或者 hive。</td> + </tr> + <tr> + <td>catalog.properties.warehouse</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Paimon 仓库存储数据的根目录。</td> + </tr> + <tr> + <td>catalog.properties.uri</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Hive metastore 的 uri,在 metastore 设置为 hive 的时候需要。</td> + </tr> + <tr> + <td>commit.user</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>提交数据文件时的用户名, 默认值为 `admin`.</td> + </tr> Review Comment: ```suggestion <tr> <td>commit.user</td> <td>optional</td> <td style="word-wrap: break-word;"><code>admin</code></td> <td>String</td> <td>提交数据文件时的用户名。</td> </tr> ``` ########## docs/content/pipelines/paimon-pipeline(ZH).md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline +---------------- + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: Inconsistent indentation. ########## docs/content/pipelines/index.md: ########## Review Comment: Should this be placed in `content/docs/pipelines`, rename it to `_index.md` and specify metadata info just like other units? ########## docs/content/pipelines/paimon-pipeline(ZH).md: ########## Review Comment: Should Chinese version documentations be placed separately in `contents.zh` folder? ########## docs/content/pipelines/paimon-pipeline(ZH).md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline +---------------- + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` + +Pipeline 连接器配置项 +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>指定要使用的连接器, 这里需要设置成 <code>'paimon'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Sink 的名称.</td> + </tr> + <tr> + <td>catalog.properties.metastore</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>用于构建 Paimon Catalog 的类型。可选填值 filesystem 或者 hive。</td> + </tr> Review Comment: Though `metastore` field is required, it has default value `filesystem` so user don't have to specify it explicitly. ```suggestion <tr> <td>catalog.properties.metastore</td> <td>optional</td> <td style="word-wrap: break-word;"><code>"filesystem"</code></td> <td>String</td> <td>用于构建 Paimon Catalog 的类型。可选填值 filesystem 或者 hive。</td> </tr> ``` ########## docs/content/pipelines/paimon-pipeline.md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline Connector + +The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Paimon](https://paimon.apache.org). This document describes how to set up the Paimon Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization + +How to create Pipeline +---------------- + +The pipeline for reading data from MySQL and sink to Paimon can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: Inconsistent indentation. ########## docs/content/pipelines/paimon-pipeline.md: ########## @@ -0,0 +1,209 @@ +# Paimon Pipeline Connector + +The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Paimon](https://paimon.apache.org). This document describes how to set up the Paimon Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization + +How to create Pipeline +---------------- + +The pipeline for reading data from MySQL and sink to Paimon can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` + +Pipeline Connector Options +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use, here should be <code>'paimon'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the sink.</td> + </tr> + <tr> + <td>catalog.properties.metastore</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Metastore of paimon catalog, supports filesystem and hive.</td> + </tr> Review Comment: Though `metastore` field is marked required, it has default value `filesystem` so user don't have to specify it explicitly. ```suggestion <tr> <td>catalog.properties.metastore</td> <td>optional</td> <td style="word-wrap: break-word;"><code>"filesystem"</code></td> <td>String</td> <td>Metastore of paimon catalog, supports filesystem and hive.</td> </tr> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
