This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b2e87c86 [hotfix] Fix Catalog Exception and close catalog in cdc 
functions
0b2e87c86 is described below

commit 0b2e87c866a3277b75da5577646ed626d1a0de00
Author: JingsongLi <[email protected]>
AuthorDate: Sun Jul 9 11:18:22 2023 +0800

    [hotfix] Fix Catalog Exception and close catalog in cdc functions
---
 .../src/main/java/org/apache/paimon/catalog/Catalog.java      | 11 +++++++++++
 .../src/main/java/org/apache/paimon/flink/FlinkCatalog.java   |  6 +++---
 .../sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java  |  8 ++++++++
 .../flink/sink/cdc/UpdatedDataFieldsProcessFunction.java      |  8 ++++++++
 .../src/main/java/org/apache/paimon/spark/SparkCatalog.java   |  6 +++---
 5 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 0004680c9..426b2f759 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -184,6 +184,17 @@ public interface Catalog extends AutoCloseable {
     void alterTable(Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
 
+    /**
+     * Modify an existing table from a {@link SchemaChange}.
+     *
+     * <p>NOTE: System tables can not be altered.
+     *
+     * @param identifier path of the table to be modified
+     * @param change the schema change
+     * @param ignoreIfNotExists flag to specify behavior when the table does 
not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     */
     default void alterTable(Identifier identifier, SchemaChange change, 
boolean ignoreIfNotExists)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         alterTable(identifier, Collections.singletonList(change), 
ignoreIfNotExists);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index f3df315bb..b5d58b51a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -384,10 +384,10 @@ public class FlinkCatalog extends AbstractCatalog {
 
         try {
             catalog.alterTable(toIdentifier(tablePath), changes, 
ignoreIfNotExists);
-        } catch (Catalog.TableNotExistException
-                | Catalog.ColumnAlreadyExistException
-                | Catalog.ColumnNotExistException e) {
+        } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException(getName(), tablePath);
+        } catch (Catalog.ColumnAlreadyExistException | 
Catalog.ColumnNotExistException e) {
+            throw new CatalogException(e);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 834930355..c0572b3a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -102,6 +102,14 @@ public class MultiTableUpdatedDataFieldsProcessFunction
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+    }
+
     private List<SchemaChange> extractSchemaChanges(
             SchemaManager schemaManager, List<DataField> updatedDataFields) {
         RowType oldRowType = schemaManager.latest().get().logicalRowType();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index b179dbc9e..08db4c70d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -83,6 +83,14 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+    }
+
     private List<SchemaChange> extractSchemaChanges(List<DataField> 
updatedDataFields) {
         RowType oldRowType = schemaManager.latest().get().logicalRowType();
         Map<String, DataField> oldFields = new HashMap<>();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index a50fe1abb..e6a671eac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -278,10 +278,10 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
         try {
             catalog.alterTable(toIdentifier(ident), schemaChanges, false);
             return loadTable(ident);
-        } catch (Catalog.TableNotExistException
-                | Catalog.ColumnAlreadyExistException
-                | Catalog.ColumnNotExistException e) {
+        } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
+        } catch (Catalog.ColumnAlreadyExistException | 
Catalog.ColumnNotExistException e) {
+            throw new RuntimeException(e);
         }
     }
 

Reply via email to