This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c8ce3350a [flink] CDC schema evolution should pass catalog information
(#1521)
c8ce3350a is described below
commit c8ce3350a7b9dc7fc42c7f07f98af7e8d6d652f6
Author: yuzelin <[email protected]>
AuthorDate: Sun Jul 9 10:57:49 2023 +0800
[flink] CDC schema evolution should pass catalog information (#1521)
---
docs/content/api/flink-api.md | 17 ++++-
.../java/org/apache/paimon/catalog/Catalog.java | 8 ++-
.../apache/paimon/catalog/FileSystemCatalog.java | 8 +--
.../org/apache/paimon/schema/SchemaManager.java | 14 +++--
.../org/apache/paimon/catalog/CatalogTestBase.java | 72 +++++++++++++---------
.../paimon/table/SchemaEvolutionTableTestBase.java | 2 +-
.../apache/paimon/table/SchemaEvolutionTest.java | 12 ++--
.../java/org/apache/paimon/flink/FlinkCatalog.java | 8 ++-
.../org/apache/paimon/flink/action/ActionBase.java | 13 +++-
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 4 +-
.../action/cdc/kafka/KafkaSyncTableAction.java | 4 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 13 +---
.../action/cdc/mysql/MySqlSyncTableAction.java | 4 +-
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 21 ++++++-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 39 +++++++-----
...MultiTableUpdatedDataFieldsProcessFunction.java | 13 ++--
.../paimon/flink/sink/cdc/RichCdcSinkBuilder.java | 16 +++++
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 25 ++++++--
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 29 ++++++---
.../cdc/mysql/MySqlSyncTableActionITCase.java | 7 ++-
.../cdc/mysql/TestAlterTableCatalogFactory.java | 59 ++++++++++++++++++
.../mysql/TestCaseInsensitiveCatalogFactory.java | 3 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 10 +++
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 31 +++++++++-
.../org.apache.paimon.catalog.CatalogFactory | 1 +
.../java/org/apache/paimon/hive/HiveCatalog.java | 30 ++++-----
.../java/org/apache/paimon/spark/SparkCatalog.java | 4 +-
.../paimon/spark/SparkSchemaEvolutionITCase.java | 37 ++++++-----
28 files changed, 361 insertions(+), 143 deletions(-)
diff --git a/docs/content/api/flink-api.md b/docs/content/api/flink-api.md
index d39952b10..f2dd1f8c4 100644
--- a/docs/content/api/flink-api.md
+++ b/docs/content/api/flink-api.md
@@ -159,6 +159,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.RichCdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataTypes;
@@ -186,12 +187,23 @@ public class WriteCdcToTable {
.field("dt", DataTypes.TIMESTAMP(),
"2023-06-12 20:21:12")
.build());
- new
RichCdcSinkBuilder().withInput(dataStream).withTable(createTableIfNotExists()).build();
+ Identifier identifier = Identifier.create("my_db", "T");
+ Options catalogOptions = new Options();
+ catalogOptions.set("warehouse", "/path/to/warehouse");
+ Catalog.Loader catalogLoader =
+ () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
+ new RichCdcSinkBuilder()
+ .withInput(dataStream)
+ .withTable(createTableIfNotExists(identifier))
+ .withIdentifier(identifier)
+ .withCatalogLoader(catalogLoader)
+ .build();
env.execute();
}
- private static Table createTableIfNotExists() throws Exception {
+ private static Table createTableIfNotExists(Identifier identifier) throws
Exception {
CatalogContext context = CatalogContext.create(new Path("..."));
Catalog catalog = CatalogFactory.createCatalog(context);
@@ -200,7 +212,6 @@ public class WriteCdcToTable {
schemaBuilder.column("order_id", DataTypes.BIGINT());
schemaBuilder.column("price", DataTypes.DOUBLE());
Schema schema = schemaBuilder.build();
- Identifier identifier = Identifier.create("my_db", "T");
try {
catalog.createTable(identifier, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index b26d7fe37..0004680c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -181,7 +182,12 @@ public interface Catalog extends AutoCloseable {
* @throws TableNotExistException if the table does not exist
*/
void alterTable(Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
- throws TableNotExistException;
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
+
+ default void alterTable(Identifier identifier, SchemaChange change,
boolean ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ alterTable(identifier, Collections.singletonList(change),
ignoreIfNotExists);
+ }
/** Return a boolean that indicates whether this catalog is
case-sensitive. */
default boolean caseSensitive() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 8a52c5182..16a99b1b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -189,15 +189,13 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
- throws TableNotExistException {
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
if (!tableExists(getDataTableLocation(identifier))) {
throw new TableNotExistException(identifier);
}
- uncheck(
- () ->
- new SchemaManager(fileIO,
getDataTableLocation(identifier))
- .commitChanges(changes));
+
+ new SchemaManager(fileIO,
getDataTableLocation(identifier)).commitChanges(changes);
}
private static <T> T uncheck(Callable<T> callable) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index e47191b81..f46d3b8c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -187,7 +187,9 @@ public class SchemaManager implements Serializable {
}
/** Update {@link SchemaChange}s. */
- public TableSchema commitChanges(List<SchemaChange> changes) throws
Exception {
+ public TableSchema commitChanges(List<SchemaChange> changes)
+ throws Catalog.TableNotExistException,
Catalog.ColumnAlreadyExistException,
+ Catalog.ColumnNotExistException {
while (true) {
TableSchema schema =
latest().orElseThrow(
@@ -372,9 +374,13 @@ public class SchemaManager implements Serializable {
newOptions,
schema.comment());
- boolean success = commit(newSchema);
- if (success) {
- return newSchema;
+ try {
+ boolean success = commit(newSchema);
+ if (success) {
+ return newSchema;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index b498e2d06..9c71fb57d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -468,8 +469,10 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.addColumn("col1",
DataTypes.INT())),
false))
-
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
- .hasRootCauseMessage("Column col1 already exists in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnAlreadyExistException.class,
+ "Column col1 already exists in the
test_db.test_table table."));
}
@Test
@@ -506,8 +509,10 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.renameColumn("col1", "new_col1")),
false))
-
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
- .hasRootCauseMessage("Column col1 already exists in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnAlreadyExistException.class,
+ "Column col1 already exists in the
test_db.test_table table."));
// Alter table renames a column throws ColumnNotExistException when
column does not exist
assertThatThrownBy(
@@ -518,9 +523,10 @@ public abstract class CatalogTestBase {
SchemaChange.renameColumn(
"non_existing_col",
"new_col2")),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasRootCauseMessage(
- "Column [non_existing_col] does not exist in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnNotExistException.class,
+ "Column [non_existing_col] does not exist in
the test_db.test_table table."));
}
@Test
@@ -553,8 +559,9 @@ public abstract class CatalogTestBase {
identifier,
Lists.newArrayList(SchemaChange.dropColumn("col2")),
false))
- .hasRootCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(" Cannot drop all fields in table");
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class, "Cannot drop
all fields in table"));
// Alter table drop a column throws ColumnNotExistException when
column does not exist
assertThatThrownBy(
@@ -564,9 +571,10 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.dropColumn("non_existing_col")),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasRootCauseMessage(
- "Column non_existing_col does not exist in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnNotExistException.class,
+ "Column non_existing_col does not exist in the
test_db.test_table table."));
}
@Test
@@ -605,9 +613,10 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnType(
"col1",
DataTypes.DATE())),
false))
- .hasRootCauseInstanceOf(IllegalStateException.class)
- .hasMessageContaining(
- "Column type col1[DOUBLE] cannot be converted to DATE
without loosing information.");
+ .satisfies(
+ anyCauseMatches(
+ IllegalStateException.class,
+ "Column type col1[DOUBLE] cannot be converted
to DATE without loosing information."));
// Alter table update a column type throws ColumnNotExistException
when column does not
// exist
@@ -619,9 +628,10 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnType(
"non_existing_col",
DataTypes.INT())),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasRootCauseMessage(
- "Column [non_existing_col] does not exist in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnNotExistException.class,
+ "Column [non_existing_col] does not exist in
the test_db.test_table table."));
// Alter table update a column type throws Exception when column is
partition columns
assertThatThrownBy(
() ->
@@ -631,8 +641,10 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnType(
"dt",
DataTypes.DATE())),
false))
- .hasRootCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Cannot update partition column [dt]
type in the table");
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Cannot update partition column [dt] type in
the table"));
}
@Test
@@ -687,9 +699,10 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnComment(
new String[]
{"non_existing_col"}, "")),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasMessageContaining(
- "Column [non_existing_col] does not exist in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnNotExistException.class,
+ "Column [non_existing_col] does not exist in
the test_db.test_table table."));
}
@Test
@@ -742,9 +755,10 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnNullability(
new String[]
{"non_existing_col"}, false)),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasMessageContaining(
- "Column [non_existing_col] does not exist in the
test_db.test_table table.");
+ .satisfies(
+ anyCauseMatches(
+ Catalog.ColumnNotExistException.class,
+ "Column [non_existing_col] does not exist in
the test_db.test_table table."));
// Alter table update a column nullability throws Exception when
column is pk columns
assertThatThrownBy(
@@ -755,7 +769,9 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnNullability(
new String[] {"col2"},
true)),
false))
- .hasRootCauseInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Cannot change nullability of primary
key");
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot change nullability of primary key"));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index e2630744b..ad76e557a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -513,7 +513,7 @@ public abstract class SchemaEvolutionTableTestBase {
}
@Override
- public TableSchema commitChanges(List<SchemaChange> changes) throws
Exception {
+ public TableSchema commitChanges(List<SchemaChange> changes) {
throw new UnsupportedOperationException();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 184dea774..163434cc5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -332,11 +333,12 @@ public class SchemaEvolutionTest {
schemaManager.commitChanges(
Collections.singletonList(
SchemaChange.renameColumn("f0", "_VALUE_KIND"))))
- .isInstanceOf(IllegalStateException.class)
- .hasMessage(
- String.format(
- "Field name[%s] in schema cannot be exist in
%s",
- "_VALUE_KIND", SYSTEM_FIELD_NAMES));
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ RuntimeException.class,
+ String.format(
+ "Field name[%s] in schema cannot be
exist in %s",
+ "_VALUE_KIND", SYSTEM_FIELD_NAMES)));
}
private List<String> readRecords(FileStoreTable table, Predicate filter)
throws IOException {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b7a4bca89..f3df315bb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -384,7 +384,9 @@ public class FlinkCatalog extends AbstractCatalog {
try {
catalog.alterTable(toIdentifier(tablePath), changes,
ignoreIfNotExists);
- } catch (Catalog.TableNotExistException e) {
+ } catch (Catalog.TableNotExistException
+ | Catalog.ColumnAlreadyExistException
+ | Catalog.ColumnNotExistException e) {
throw new TableNotExistException(getName(), tablePath);
}
}
@@ -415,7 +417,9 @@ public class FlinkCatalog extends AbstractCatalog {
try {
catalog.alterTable(toIdentifier(tablePath), changes,
ignoreIfNotExists);
- } catch (Catalog.TableNotExistException e) {
+ } catch (Catalog.TableNotExistException
+ | Catalog.ColumnAlreadyExistException
+ | Catalog.ColumnNotExistException e) {
throw new TableNotExistException(getName(), tablePath);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 49e805393..0e4eaa654 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -35,15 +35,16 @@ import java.util.UUID;
/** Abstract base of {@link Action} for table. */
public abstract class ActionBase implements Action {
- protected final Catalog catalog;
+ private final Options catalogOptions;
+ protected final Catalog catalog;
protected final FlinkCatalog flinkCatalog;
-
protected final String catalogName = "paimon-" + UUID.randomUUID();
public ActionBase(String warehouse, Map<String, String> catalogConfig) {
- Options catalogOptions = Options.fromMap(catalogConfig);
+ catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
+
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog);
}
@@ -53,4 +54,10 @@ public abstract class ActionBase implements Action {
String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultName);
env.execute(name);
}
+
+ protected Catalog.Loader catalogLoader() {
+ // to make the action workflow serializable
+ Options catalogOptions = this.catalogOptions;
+ return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 676c9d344..f254ac5b4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -229,7 +229,9 @@ public class KafkaSyncDatabaseAction extends ActionBase {
env.fromSource(
source,
WatermarkStrategy.noWatermarks(), "Kafka Source"))
.withParserFactory(parserFactory)
- .withTables(fileStoreTables);
+ .withTables(fileStoreTables)
+ .withCatalogLoader(catalogLoader())
+ .withDatabase(database);
String sinkParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index cbcef632c..418a83102 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -187,7 +187,9 @@ public class KafkaSyncTableAction extends ActionBase {
env.fromSource(
source,
WatermarkStrategy.noWatermarks(), "Kafka Source"))
.withParserFactory(parserFactory)
- .withTable(table);
+ .withTable(table)
+ .withIdentifier(identifier)
+ .withCatalogLoader(catalogLoader());
String sinkParallelism =
paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 1fe314242..97cde5d17 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -20,15 +20,12 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
@@ -114,7 +111,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
@Nullable private final Pattern excludingPattern;
private final Map<String, String> tableConfig;
private final String includingTables;
- private final Options catalogOptions;
private final MySqlDatabaseSyncMode mode;
public MySqlSyncDatabaseAction(
@@ -159,8 +155,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
this.includingTables = includingTables == null ? ".*" :
includingTables;
this.includingPattern = Pattern.compile(this.includingTables);
this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
- this.catalogOptions = Options.fromMap(catalogConfig);
- catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
this.tableConfig = tableConfig;
this.mode = mode;
}
@@ -249,8 +243,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String database = this.database;
MySqlDatabaseSyncMode mode = this.mode;
- // To make the sync database workflow serializable
- Options catalogOptions = this.catalogOptions;
FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
new FlinkCdcSyncDatabaseSinkBuilder<String>()
.withInput(
@@ -258,10 +250,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
source,
WatermarkStrategy.noWatermarks(), "MySQL Source"))
.withParserFactory(parserFactory)
.withDatabase(database)
- .withCatalogLoader(
- () -> {
- return
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
- })
+ .withCatalogLoader(catalogLoader())
.withTables(fileStoreTables)
.withMode(mode);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 5b83ba0f7..8382aee6d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -191,7 +191,9 @@ public class MySqlSyncTableAction extends ActionBase {
env.fromSource(
source,
WatermarkStrategy.noWatermarks(), "MySQL Source"))
.withParserFactory(parserFactory)
- .withTable(table);
+ .withTable(table)
+ .withIdentifier(identifier)
+ .withCatalogLoader(catalogLoader());
String sinkParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 4d38b869f..5fd0f80d3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -19,6 +19,8 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
@@ -45,6 +47,8 @@ public class CdcSinkBuilder<T> {
private DataStream<T> input = null;
private EventParser.Factory<T> parserFactory = null;
private Table table = null;
+ private Identifier identifier = null;
+ private Catalog.Loader catalogLoader = null;
@Nullable private Integer parallelism;
@@ -68,10 +72,22 @@ public class CdcSinkBuilder<T> {
return this;
}
+ public CdcSinkBuilder<T> withIdentifier(Identifier identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ public CdcSinkBuilder<T> withCatalogLoader(Catalog.Loader catalogLoader) {
+ this.catalogLoader = catalogLoader;
+ return this;
+ }
+
public DataStreamSink<?> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not
be null.");
Preconditions.checkNotNull(table, "Paimon Table can not be null.");
+ Preconditions.checkNotNull(identifier, "Paimon Table Identifier can
not be null.");
+ Preconditions.checkNotNull(catalogLoader, "Paimon Catalog Loader can
not be null.");
if (!(table instanceof FileStoreTable)) {
throw new IllegalArgumentException(
@@ -90,8 +106,9 @@ public class CdcSinkBuilder<T> {
parsed,
CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG)
.process(
new UpdatedDataFieldsProcessFunction(
- new SchemaManager(
- dataTable.fileIO(),
dataTable.location())));
+ new SchemaManager(dataTable.fileIO(),
dataTable.location()),
+ identifier,
+ catalogLoader));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 404fcbd87..6ba1278a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
@@ -90,9 +91,26 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
return this;
}
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader
catalogLoader) {
+ this.catalogLoader = catalogLoader;
+ return this;
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MySqlDatabaseSyncMode
mode) {
+ this.mode = mode;
+ return this;
+ }
+
public void build() {
Preconditions.checkNotNull(input);
Preconditions.checkNotNull(parserFactory);
+ Preconditions.checkNotNull(database);
+ Preconditions.checkNotNull(catalogLoader);
if (mode == UNIFIED) {
buildUnifiedCdcSink();
@@ -145,6 +163,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
}
private void buildStaticCdcSink() {
+ Preconditions.checkNotNull(tables);
+
SingleOutputStreamOperator<Void> parsed =
input.forward()
.process(new
CdcMultiTableParsingProcessFunction<>(parserFactory))
@@ -158,7 +178,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
.createUpdatedDataFieldsOutputTag(table.name()))
.process(
new UpdatedDataFieldsProcessFunction(
- new SchemaManager(table.fileIO(),
table.location())));
+ new SchemaManager(table.fileIO(),
table.location()),
+ Identifier.create(database,
table.name()),
+ catalogLoader));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
@@ -183,19 +205,4 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
}
}
}
-
- public FlinkCdcSyncDatabaseSinkBuilder<T> withDatabase(String database) {
- this.database = database;
- return this;
- }
-
- public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader
catalogLoader) {
- this.catalogLoader = catalogLoader;
- return this;
- }
-
- public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MySqlDatabaseSyncMode
mode) {
- this.mode = mode;
- return this;
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 1f0c800d6..834930355 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -98,7 +98,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
for (SchemaChange schemaChange :
extractSchemaChanges(schemaManager, updatedDataFields.f1)) {
- applySchemaChange(schemaManager, schemaChange);
+ applySchemaChange(schemaManager, schemaChange, tableId);
}
}
@@ -139,14 +139,15 @@ public class MultiTableUpdatedDataFieldsProcessFunction
return result;
}
- private void applySchemaChange(SchemaManager schemaManager, SchemaChange
schemaChange)
+ private void applySchemaChange(
+ SchemaManager schemaManager, SchemaChange schemaChange, Identifier
identifier)
throws Exception {
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
- schemaManager.commitChanges(schemaChange);
+ catalog.alterTable(identifier, schemaChange, false);
} catch (Catalog.ColumnAlreadyExistException e) {
// This is normal. For example when a table is split into
multiple database tables,
- // all these tables will be added the same column. However
schemaManager can't
+ // all these tables will be added the same column. However,
schemaManager can't
// handle duplicated column adds, so we just catch the
exception and log it.
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -176,7 +177,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
DataType newType = updateColumnType.newDataType();
switch (canConvert(oldType, newType)) {
case CONVERT:
- schemaManager.commitChanges(schemaChange);
+ catalog.alterTable(identifier, schemaChange, false);
break;
case EXCEPTION:
throw new UnsupportedOperationException(
@@ -185,7 +186,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
updateColumnType.fieldName(), oldType,
newType));
}
} else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
- schemaManager.commitChanges(schemaChange);
+ catalog.alterTable(identifier, schemaChange, false);
} else {
throw new UnsupportedOperationException(
"Unsupported schema change class "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 130d3812c..5648b695a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -19,6 +19,8 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -32,6 +34,8 @@ public class RichCdcSinkBuilder {
private DataStream<RichCdcRecord> input = null;
private Table table = null;
+ private Identifier identifier = null;
+ private Catalog.Loader catalogLoader = null;
@Nullable private Integer parallelism;
@@ -50,12 +54,24 @@ public class RichCdcSinkBuilder {
return this;
}
+ public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ this.catalogLoader = catalogLoader;
+ return this;
+ }
+
public DataStreamSink<?> build() {
CdcSinkBuilder<RichCdcRecord> builder = new CdcSinkBuilder<>();
return builder.withTable(table)
.withInput(input)
.withParserFactory(new RichCdcParserFactory())
.withParallelism(parallelism)
+ .withIdentifier(identifier)
+ .withCatalogLoader(catalogLoader)
.build();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 5ff10d82d..b179dbc9e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -29,6 +30,7 @@ import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -55,8 +57,21 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
private final SchemaManager schemaManager;
- public UpdatedDataFieldsProcessFunction(SchemaManager schemaManager) {
+ private final Catalog.Loader catalogLoader;
+ private final Identifier identifier;
+
+ private Catalog catalog;
+
+ public UpdatedDataFieldsProcessFunction(
+ SchemaManager schemaManager, Identifier identifier, Catalog.Loader
catalogLoader) {
this.schemaManager = schemaManager;
+ this.identifier = identifier;
+ this.catalogLoader = catalogLoader;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ this.catalog = catalogLoader.load();
}
@Override
@@ -107,10 +122,10 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
private void applySchemaChange(SchemaChange schemaChange) throws Exception
{
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
- schemaManager.commitChanges(schemaChange);
+ catalog.alterTable(identifier, schemaChange, false);
} catch (Catalog.ColumnAlreadyExistException e) {
// This is normal. For example when a table is split into
multiple database tables,
- // all these tables will be added the same column. However
schemaManager can't
+ // all these tables will be added the same column. However,
schemaManager can't
// handle duplicated column adds, so we just catch the
exception and log it.
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -140,7 +155,7 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
DataType newType = updateColumnType.newDataType();
switch (canConvert(oldType, newType)) {
case CONVERT:
- schemaManager.commitChanges(schemaChange);
+ catalog.alterTable(identifier, schemaChange, false);
break;
case EXCEPTION:
throw new UnsupportedOperationException(
@@ -149,7 +164,7 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
updateColumnType.fieldName(), oldType,
newType));
}
} else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
- schemaManager.commitChanges(schemaChange);
+ catalog.alterTable(identifier, schemaChange, false);
} else {
throw new UnsupportedOperationException(
"Unsupported schema change class "
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 50b4611c4..0e3143a0a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -559,6 +559,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
assertTableNotExists(notExistedTables);
}
+ @Test
@Timeout(60)
public void testIgnoreCase() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
@@ -671,7 +672,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
boolean testSchemaChange,
String databaseName)
throws Exception {
- JobClient client =
buildSyncDatabaseActionWithNewlyAddedTables(databaseName);
+ JobClient client =
+ buildSyncDatabaseActionWithNewlyAddedTables(databaseName,
testSchemaChange);
waitJobRunning(client);
try (Connection conn =
@@ -759,7 +761,9 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
.join();
assertThat(savepoint).isNotBlank();
- client = buildSyncDatabaseActionWithNewlyAddedTables(savepoint,
databaseName);
+ client =
+ buildSyncDatabaseActionWithNewlyAddedTables(
+ savepoint, databaseName, testSchemaChange);
waitJobRunning(client);
}
@@ -832,6 +836,10 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
},
new String[] {"k", "v1", "v2"});
waitForResult(expectedRecords, newTable, rowType,
newTablePrimaryKeys);
+
+ // test that catalog loader works
+ assertThat(getFileStoreTable(tableName).options())
+ .containsEntry("alter-table-test", "true");
}
}
@@ -877,13 +885,13 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
"CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY
(k))", newTableName));
}
- private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String
databaseName)
- throws Exception {
- return buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName);
+ private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
+ String databaseName, boolean testSchemaChange) throws Exception {
+ return buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName,
testSchemaChange);
}
private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
- String savepointPath, String databaseName) throws Exception {
+ String savepointPath, String databaseName, boolean
testSchemaChange) throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", databaseName);
@@ -897,6 +905,13 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(2) +
2));
+
+ Map<String, String> catalogConfig =
+ testSchemaChange
+ ? Collections.singletonMap(
+ CatalogOptions.METASTORE.key(),
"test-alter-table")
+ : Collections.emptyMap();
+
MySqlSyncDatabaseAction action =
new MySqlSyncDatabaseAction(
mySqlConfig,
@@ -907,7 +922,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
null,
"t.+",
null,
- Collections.emptyMap(),
+ catalogConfig,
tableConfig,
UNIFIED);
action.build(env);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 30bde7232..8a608e674 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
@@ -79,7 +80,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.emptyMap(),
+ Collections.singletonMap(
+ CatalogOptions.METASTORE.key(),
"test-alter-table"),
tableConfig);
action.build(env);
JobClient client = env.executeAsync();
@@ -245,6 +247,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"+I[2, 8, very long string, 80000000000, NULL, NULL,
NULL]",
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105,
110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
waitForResult(expected, table, rowType, primaryKeys);
+
+ // test that catalog loader works
+
assertThat(getFileStoreTable().options()).containsEntry("alter-table-test",
"true");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
new file mode 100644
index 000000000..d7f9f7ce1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Factory to create a mock catalog for catalog loader test. If catalog loader
works, the
+ * 'alterTable' method will leave a special option.
+ */
+public class TestAlterTableCatalogFactory implements CatalogFactory {
+
+ @Override
+ public String identifier() {
+ return "test-alter-table";
+ }
+
+ @Override
+ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
+ return new FileSystemCatalog(fileIO, warehouse,
context.options().toMap()) {
+
+ @Override
+ public void alterTable(
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
+ ColumnNotExistException {
+ List<SchemaChange> newChanges = new ArrayList<>(changes);
+ newChanges.add(SchemaChange.setOption("alter-table-test",
"true"));
+ super.alterTable(identifier, newChanges, ignoreIfNotExists);
+ }
+ };
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
index e0ba24b9a..82caabd7b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
@@ -71,7 +71,8 @@ public class TestCaseInsensitiveCatalogFactory implements
CatalogFactory {
@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
- throws TableNotExistException {
+ throws TableNotExistException, ColumnAlreadyExistException,
+ ColumnNotExistException {
for (SchemaChange change : changes) {
if (change instanceof SchemaChange.AddColumn) {
checkCaseInsensitive(((SchemaChange.AddColumn)
change).fieldName());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index d3cf58e0f..c8b0a754b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -19,8 +19,10 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -146,6 +148,12 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
TestCdcSourceFunction sourceFunction = new
TestCdcSourceFunction(events);
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
source.setParallelism(2);
+
+ Options catalogOptions = new Options();
+ catalogOptions.set("warehouse", tempDir.toString());
+ Catalog.Loader catalogLoader =
+ () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
new FlinkCdcSyncDatabaseSinkBuilder<TestCdcEvent>()
.withInput(source)
.withParserFactory(TestCdcEventParser::new)
@@ -153,6 +161,8 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
// because we have at most 3 tables and 8 slots in
AbstractTestBase
// each table can only get 2 slots
.withParallelism(2)
+ .withDatabase(DATABASE_NAME)
+ .withCatalogLoader(catalogLoader)
.build();
// enable failure when running jobs if needed
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index e47cb03d9..a16447364 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,11 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -55,6 +59,9 @@ import java.util.concurrent.ThreadLocalRandom;
/** IT cases for {@link CdcSinkBuilder}. */
public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
+ private static final String DATABASE_NAME = "test";
+ private static final String TABLE_NAME = "test_tbl";
+
@TempDir java.nio.file.Path tempDir;
@Test
@@ -79,16 +86,26 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
boolean enableFailure = random.nextBoolean();
TestTable testTable =
- new TestTable("test_tbl", numEvents, numSchemaChanges,
numPartitions, numKeys);
+ new TestTable(TABLE_NAME, numEvents, numSchemaChanges,
numPartitions, numKeys);
Path tablePath;
FileIO fileIO;
String failingName = UUID.randomUUID().toString();
if (enableFailure) {
- tablePath = new Path(FailingFileIO.getFailingPath(failingName,
tempDir.toString()));
+ tablePath =
+ new Path(
+ FailingFileIO.getFailingPath(
+ failingName,
+ CatalogUtils.stringifyPath(
+ tempDir.toString(), DATABASE_NAME,
TABLE_NAME)));
fileIO = new FailingFileIO();
} else {
- tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ tablePath =
+ new Path(
+ TraceableFileIO.SCHEME
+ + "://"
+ + CatalogUtils.stringifyPath(
+ tempDir.toString(), DATABASE_NAME,
TABLE_NAME));
fileIO = LocalFileIO.create();
}
@@ -113,11 +130,19 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
TestCdcSourceFunction sourceFunction = new
TestCdcSourceFunction(testTable.events());
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
source.setParallelism(2);
+
+ Options catalogOptions = new Options();
+ catalogOptions.set("warehouse", tempDir.toString());
+ Catalog.Loader catalogLoader =
+ () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
new CdcSinkBuilder<TestCdcEvent>()
.withInput(source)
.withParserFactory(TestCdcEventParser::new)
.withTable(table)
.withParallelism(3)
+ .withIdentifier(Identifier.create(DATABASE_NAME, TABLE_NAME))
+ .withCatalogLoader(catalogLoader)
.build();
// enable failure when running jobs if needed
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
index 3474fc387..faa8cf673 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory
+org.apache.paimon.flink.action.cdc.mysql.TestAlterTableCatalogFactory
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 29f26f14f..ea73a0995 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -343,7 +343,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
- throws TableNotExistException {
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
if (!paimonTableExists(identifier)) {
if (ignoreIfNotExists) {
@@ -354,23 +354,19 @@ public class HiveCatalog extends AbstractCatalog {
}
checkFieldNamesUpperCaseInSchemaChange(changes);
- try {
- final SchemaManager schemaManager = schemaManager(identifier);
- // first commit changes to underlying files
- TableSchema schema = schemaManager.commitChanges(changes);
- try {
- // sync to hive hms
- Table table =
- client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
- updateHmsTable(table, identifier, schema);
- client.alter_table(identifier.getDatabaseName(),
identifier.getObjectName(), table);
- } catch (TException te) {
- schemaManager.deleteSchema(schema.id());
- throw te;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ final SchemaManager schemaManager = schemaManager(identifier);
+ // first commit changes to underlying files
+ TableSchema schema = schemaManager.commitChanges(changes);
+
+ try {
+ // sync to hive hms
+ Table table = client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
+ updateHmsTable(table, identifier, schema);
+ client.alter_table(identifier.getDatabaseName(),
identifier.getObjectName(), table);
+ } catch (TException te) {
+ schemaManager.deleteSchema(schema.id());
+ throw new RuntimeException(te);
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 3e93d5d8e..a50fe1abb 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -278,7 +278,9 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
try {
catalog.alterTable(toIdentifier(ident), schemaChanges, false);
return loadTable(ident);
- } catch (Catalog.TableNotExistException e) {
+ } catch (Catalog.TableNotExistException
+ | Catalog.ColumnAlreadyExistException
+ | Catalog.ColumnNotExistException e) {
throw new NoSuchTableException(ident);
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 7ed2ed2eb..64cf6519b 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -18,8 +18,6 @@
package org.apache.paimon.spark;
-import org.apache.paimon.testutils.assertj.AssertionUtils;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -31,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -97,9 +96,10 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
() ->
spark.sql(
"ALTER TABLE testAddNotNullColumn ADD
COLUMNS (d INT NOT NULL)"))
- .isInstanceOf(RuntimeException.class)
- .hasMessage(
- "java.lang.IllegalArgumentException: ADD COLUMN cannot
specify NOT NULL.");
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "ADD COLUMN cannot specify NOT NULL."));
}
@Test
@@ -193,9 +193,10 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
assertThatThrownBy(
() -> spark.sql("ALTER TABLE testRenamePartitionKey
RENAME COLUMN a to aa"))
- .isInstanceOf(RuntimeException.class)
- .hasMessage(
- "java.lang.UnsupportedOperationException: Cannot
drop/rename partition key[a]");
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot drop/rename partition key[a]"));
}
@Test
@@ -242,9 +243,10 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.contains(showCreateString("testDropPartitionKey", "a BIGINT",
"b STRING"));
assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPartitionKey
DROP COLUMN a"))
- .isInstanceOf(RuntimeException.class)
- .hasMessage(
- "java.lang.UnsupportedOperationException: Cannot
drop/rename partition key[a]");
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot drop/rename partition key[a]"));
}
@Test
@@ -261,9 +263,10 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.contains(showCreateString("testDropPrimaryKey", "a BIGINT",
"b STRING"));
assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey
DROP COLUMN b"))
- .isInstanceOf(RuntimeException.class)
- .hasMessage(
- "java.lang.UnsupportedOperationException: Cannot
drop/rename primary key[b]");
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot drop/rename primary key[b]"));
}
@Test
@@ -294,7 +297,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
createTable("tableFirstSelf");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER
COLUMN a FIRST"))
.satisfies(
- AssertionUtils.anyCauseMatches(
+ anyCauseMatches(
UnsupportedOperationException.class,
"Cannot move itself for column a"));
@@ -302,7 +305,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
createTable("tableAfterSelf");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER
COLUMN b AFTER b"))
.satisfies(
- AssertionUtils.anyCauseMatches(
+ anyCauseMatches(
UnsupportedOperationException.class,
"Cannot move itself for column b"));
@@ -375,7 +378,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
spark.sql(
"ALTER TABLE testAlterPkNullability
ALTER COLUMN a DROP NOT NULL"))
.satisfies(
- AssertionUtils.anyCauseMatches(
+ anyCauseMatches(
UnsupportedOperationException.class,
"Cannot change nullability of primary key"));
}