This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0b2e87c86 [hotfix] Fix Catalog Exception and close catalog in cdc
functions
0b2e87c86 is described below
commit 0b2e87c866a3277b75da5577646ed626d1a0de00
Author: JingsongLi <[email protected]>
AuthorDate: Sun Jul 9 11:18:22 2023 +0800
[hotfix] Fix Catalog Exception and close catalog in cdc functions
---
.../src/main/java/org/apache/paimon/catalog/Catalog.java | 11 +++++++++++
.../src/main/java/org/apache/paimon/flink/FlinkCatalog.java | 6 +++---
.../sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java | 8 ++++++++
.../flink/sink/cdc/UpdatedDataFieldsProcessFunction.java | 8 ++++++++
.../src/main/java/org/apache/paimon/spark/SparkCatalog.java | 6 +++---
5 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 0004680c9..426b2f759 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -184,6 +184,17 @@ public interface Catalog extends AutoCloseable {
void alterTable(Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
+ /**
+ * Modify an existing table from a {@link SchemaChange}.
+ *
+ * <p>NOTE: System tables can not be altered.
+ *
+ * @param identifier path of the table to be modified
+ * @param change the schema change
+ * @param ignoreIfNotExists flag to specify behavior when the table does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the table does not exist
+ */
default void alterTable(Identifier identifier, SchemaChange change,
boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
alterTable(identifier, Collections.singletonList(change),
ignoreIfNotExists);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index f3df315bb..b5d58b51a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -384,10 +384,10 @@ public class FlinkCatalog extends AbstractCatalog {
try {
catalog.alterTable(toIdentifier(tablePath), changes,
ignoreIfNotExists);
- } catch (Catalog.TableNotExistException
- | Catalog.ColumnAlreadyExistException
- | Catalog.ColumnNotExistException e) {
+ } catch (Catalog.TableNotExistException e) {
throw new TableNotExistException(getName(), tablePath);
+ } catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
+ throw new CatalogException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 834930355..c0572b3a2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -102,6 +102,14 @@ public class MultiTableUpdatedDataFieldsProcessFunction
}
}
+ @Override
+ public void close() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
+ }
+
private List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, List<DataField> updatedDataFields) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index b179dbc9e..08db4c70d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -83,6 +83,14 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
}
}
+ @Override
+ public void close() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
+ }
+
private List<SchemaChange> extractSchemaChanges(List<DataField>
updatedDataFields) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index a50fe1abb..e6a671eac 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -278,10 +278,10 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
try {
catalog.alterTable(toIdentifier(ident), schemaChanges, false);
return loadTable(ident);
- } catch (Catalog.TableNotExistException
- | Catalog.ColumnAlreadyExistException
- | Catalog.ColumnNotExistException e) {
+ } catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
+ } catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
+ throw new RuntimeException(e);
}
}