This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new d3c049d8a [FLINK-36524][pipeline-connector][paimon] Bump Paimon version to 0.9.0 d3c049d8a is described below commit d3c049d8a7a21fa0d89747f5596cf3f44b3559e1 Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Thu Jan 9 12:01:52 2025 +0800 [FLINK-36524][pipeline-connector][paimon] Bump Paimon version to 0.9.0 This closes #3644 --- .../flink-cdc-pipeline-connector-paimon/pom.xml | 2 +- .../paimon/sink/PaimonDataSinkFactory.java | 2 ++ .../connectors/paimon/sink/v2/PaimonCommitter.java | 4 ++-- .../paimon/sink/v2/StoreSinkWriteImpl.java | 3 +++ .../sink/v2/bucket/BucketAssignOperator.java | 8 +++---- .../paimon/sink/PaimonMetadataApplierTest.java | 25 ++++++++++++++++++++++ .../paimon/sink/v2/PaimonSinkITCase.java | 2 +- 7 files changed, 38 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index e6b510ce6..940bbc0ac 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -29,7 +29,7 @@ limitations under the License. <artifactId>flink-cdc-pipeline-connector-paimon</artifactId> <properties> - <paimon.version>0.8.2</paimon.version> + <paimon.version>0.9.0</paimon.version> <hadoop.version>2.8.5</hadoop.version> <hive.version>2.3.9</hive.version> <mockito.version>3.4.6</mockito.version> diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index 302ba629a..388658fe6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -69,6 +69,8 @@ public class PaimonDataSinkFactory implements DataSinkFactory { } }); Options options = Options.fromMap(catalogOptions); + // Avoid using previous table schema. + options.setString("cache-enabled", "false"); try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) { Preconditions.checkNotNull( catalog.listDatabases(), "catalog option of Paimon is invalid."); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java index 07abb03bf..03c0be6be 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java @@ -45,8 +45,8 @@ public class PaimonCommitter implements Committer<MultiTableCommittable> { storeMultiCommitter = new StoreMultiCommitter( () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), - commitUser, - null); + org.apache.paimon.flink.sink.Committer.createContext( + commitUser, null, true, false, null)); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java index fb7489542..21b21d50d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java @@ -142,6 +142,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { write.withCompactExecutor(compactExecutor); } + @Override + public void withInsertOnly(boolean b) {} + @Override public SinkRecord write(InternalRow internalRow) throws Exception { return write.writeAndReturn(internalRow); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index 9b3b3afb9..b528f53aa 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -151,7 +151,7 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event> dataChangeEvent, schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); switch (tuple4.f0) { - case DYNAMIC: + case HASH_DYNAMIC: { bucket = tuple4.f2.assign( @@ -159,18 +159,18 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event> tuple4.f3.trimmedPrimaryKey(genericRow).hashCode()); break; } - case FIXED: + case HASH_FIXED: { tuple4.f1.setRecord(genericRow); bucket = tuple4.f1.bucket(); break; } - case UNAWARE: + case BUCKET_UNAWARE: { bucket = 0; break; } - case GLOBAL_DYNAMIC: + case CROSS_PARTITION: default: { throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 9f3cd806c..7b362ee0d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -87,6 +87,7 @@ public class PaimonMetadataApplierTest { } catalogOptions.setString("metastore", metastore); catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); this.catalog.dropDatabase(TEST_DATABASE, true, true); } @@ -206,6 +207,30 @@ public class PaimonMetadataApplierTest { catalog.getTable(Identifier.fromString("test.table_with_partition")); Assertions.assertEquals(tableSchema, tableWithPartition.rowType()); Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys()); + // Create table with upper case. + catalogOptions.setString("allow-upper-case", "true"); + metadataApplier = new PaimonMetadataApplier(catalogOptions); + createTableEvent = + new CreateTableEvent( + TableId.parse("test.table_with_upper_case"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "COL1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("COL1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "COL1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertEquals( + tableSchema, + catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType()); } @ParameterizedTest diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 3a554ef2f..8c54837e8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -127,7 +127,7 @@ public class PaimonSinkITCase { + "'metastore'='hive', " + "'hadoop-conf-dir'='%s', " + "'hive-conf-dir'='%s', " - + "'cache-enabled'='false' " + + "'cache-enabled'='false'" + ")", warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR)); } else {