This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c049d8a [FLINK-36524][pipeline-connector][paimon] Bump Paimon 
version to 0.9.0
d3c049d8a is described below

commit d3c049d8a7a21fa0d89747f5596cf3f44b3559e1
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Jan 9 12:01:52 2025 +0800

    [FLINK-36524][pipeline-connector][paimon] Bump Paimon version to 0.9.0
    
    This closes #3644
---
 .../flink-cdc-pipeline-connector-paimon/pom.xml    |  2 +-
 .../paimon/sink/PaimonDataSinkFactory.java         |  2 ++
 .../connectors/paimon/sink/v2/PaimonCommitter.java |  4 ++--
 .../paimon/sink/v2/StoreSinkWriteImpl.java         |  3 +++
 .../sink/v2/bucket/BucketAssignOperator.java       |  8 +++----
 .../paimon/sink/PaimonMetadataApplierTest.java     | 25 ++++++++++++++++++++++
 .../paimon/sink/v2/PaimonSinkITCase.java           |  2 +-
 7 files changed, 38 insertions(+), 8 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index e6b510ce6..940bbc0ac 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
     <artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
 
     <properties>
-        <paimon.version>0.8.2</paimon.version>
+        <paimon.version>0.9.0</paimon.version>
         <hadoop.version>2.8.5</hadoop.version>
         <hive.version>2.3.9</hive.version>
         <mockito.version>3.4.6</mockito.version>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
index 302ba629a..388658fe6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
@@ -69,6 +69,8 @@ public class PaimonDataSinkFactory implements DataSinkFactory 
{
                     }
                 });
         Options options = Options.fromMap(catalogOptions);
+        // Avoid using previous table schema.
+        options.setString("cache-enabled", "false");
         try (Catalog catalog = 
FlinkCatalogFactory.createPaimonCatalog(options)) {
             Preconditions.checkNotNull(
                     catalog.listDatabases(), "catalog option of Paimon is 
invalid.");
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
index 07abb03bf..03c0be6be 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
@@ -45,8 +45,8 @@ public class PaimonCommitter implements 
Committer<MultiTableCommittable> {
         storeMultiCommitter =
                 new StoreMultiCommitter(
                         () -> 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
-                        commitUser,
-                        null);
+                        org.apache.paimon.flink.sink.Committer.createContext(
+                                commitUser, null, true, false, null));
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index fb7489542..21b21d50d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -142,6 +142,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         write.withCompactExecutor(compactExecutor);
     }
 
+    @Override
+    public void withInsertOnly(boolean b) {}
+
     @Override
     public SinkRecord write(InternalRow internalRow) throws Exception {
         return write.writeAndReturn(internalRow);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index 9b3b3afb9..b528f53aa 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -151,7 +151,7 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                             dataChangeEvent,
                             
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
             switch (tuple4.f0) {
-                case DYNAMIC:
+                case HASH_DYNAMIC:
                     {
                         bucket =
                                 tuple4.f2.assign(
@@ -159,18 +159,18 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                                         
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
                         break;
                     }
-                case FIXED:
+                case HASH_FIXED:
                     {
                         tuple4.f1.setRecord(genericRow);
                         bucket = tuple4.f1.bucket();
                         break;
                     }
-                case UNAWARE:
+                case BUCKET_UNAWARE:
                     {
                         bucket = 0;
                         break;
                     }
-                case GLOBAL_DYNAMIC:
+                case CROSS_PARTITION:
                 default:
                     {
                         throw new RuntimeException("Unsupported bucket mode: " 
+ tuple4.f0);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 9f3cd806c..7b362ee0d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -87,6 +87,7 @@ public class PaimonMetadataApplierTest {
         }
         catalogOptions.setString("metastore", metastore);
         catalogOptions.setString("warehouse", warehouse);
+        catalogOptions.setString("cache-enabled", "false");
         this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
         this.catalog.dropDatabase(TEST_DATABASE, true, true);
     }
@@ -206,6 +207,30 @@ public class PaimonMetadataApplierTest {
                 
catalog.getTable(Identifier.fromString("test.table_with_partition"));
         Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
         Assertions.assertEquals(Arrays.asList("col1", "dt"), 
tableWithPartition.primaryKeys());
+        // Create table with upper case.
+        catalogOptions.setString("allow-upper-case", "true");
+        metadataApplier = new PaimonMetadataApplier(catalogOptions);
+        createTableEvent =
+                new CreateTableEvent(
+                        TableId.parse("test.table_with_upper_case"),
+                        org.apache.flink.cdc.common.schema.Schema.newBuilder()
+                                .physicalColumn(
+                                        "COL1",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING()
+                                                .notNull())
+                                .physicalColumn(
+                                        "col2", 
org.apache.flink.cdc.common.types.DataTypes.INT())
+                                .primaryKey("COL1")
+                                .build());
+        metadataApplier.applySchemaChange(createTableEvent);
+        tableSchema =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "COL1", 
DataTypes.STRING().notNull()),
+                                new DataField(1, "col2", DataTypes.INT())));
+        Assertions.assertEquals(
+                tableSchema,
+                
catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType());
     }
 
     @ParameterizedTest
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 3a554ef2f..8c54837e8 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -127,7 +127,7 @@ public class PaimonSinkITCase {
                                     + "'metastore'='hive', "
                                     + "'hadoop-conf-dir'='%s', "
                                     + "'hive-conf-dir'='%s', "
-                                    + "'cache-enabled'='false' "
+                                    + "'cache-enabled'='false'"
                                     + ")",
                             warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR));
         } else {

Reply via email to