This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b9e6df625 [hotfix] add column 'already exists' and 'does not exist' 
exception type (#902)
b9e6df625 is described below

commit b9e6df6259c9815b5f1a08809897067859e0caca
Author: Kerwin <[email protected]>
AuthorDate: Mon Apr 17 11:46:48 2023 +0800

    [hotfix] add column 'already exists' and 'does not exist' exception type 
(#902)
---
 .../java/org/apache/paimon/catalog/Catalog.java    | 54 ++++++++++++++++++++++
 .../org/apache/paimon/schema/SchemaManager.java    | 36 +++++++--------
 .../org/apache/paimon/catalog/CatalogTestBase.java | 33 ++++++++-----
 .../apache/paimon/table/SchemaEvolutionTest.java   | 21 +++++----
 .../sink/cdc/SchemaChangeProcessFunction.java      |  3 +-
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      | 19 ++++++--
 6 files changed, 123 insertions(+), 43 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 45a828b79..c8d7931db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -283,4 +283,58 @@ public interface Catalog extends AutoCloseable {
             return identifier;
         }
     }
+
+    /** Exception for trying to alter a column that already exists. */
+    class ColumnAlreadyExistException extends Exception {
+
+        private static final String MSG = "Column %s already exists in the %s 
table.";
+
+        private final Identifier identifier;
+        private final String column;
+
+        public ColumnAlreadyExistException(Identifier identifier, String 
column) {
+            this(identifier, column, null);
+        }
+
+        public ColumnAlreadyExistException(Identifier identifier, String 
column, Throwable cause) {
+            super(String.format(MSG, column, identifier.getFullName()), cause);
+            this.identifier = identifier;
+            this.column = column;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String column() {
+            return column;
+        }
+    }
+
+    /** Exception for trying to operate on a table that doesn't exist. */
+    class ColumnNotExistException extends Exception {
+
+        private static final String MSG = "Column %s does not exist in the %s 
table.";
+
+        private final Identifier identifier;
+        private final String column;
+
+        public ColumnNotExistException(Identifier identifier, String column) {
+            this(identifier, column, null);
+        }
+
+        public ColumnNotExistException(Identifier identifier, String column, 
Throwable cause) {
+            super(String.format(MSG, column, identifier.getFullName()), cause);
+            this.identifier = identifier;
+            this.column = column;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String column() {
+            return column;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 9eb6d349e..a9213668b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -21,6 +21,8 @@ package org.apache.paimon.schema;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
@@ -68,12 +70,14 @@ public class SchemaManager implements Serializable {
 
     private final FileIO fileIO;
     private final Path tableRoot;
+    private final Identifier identifier;
 
     @Nullable private transient Lock lock;
 
     public SchemaManager(FileIO fileIO, Path tableRoot) {
         this.fileIO = fileIO;
         this.tableRoot = tableRoot;
+        this.identifier = Identifier.fromPath(tableRoot);
     }
 
     public SchemaManager withLock(@Nullable Lock lock) {
@@ -186,8 +190,7 @@ public class SchemaManager implements Serializable {
     public TableSchema commitChanges(List<SchemaChange> changes) throws 
Exception {
         while (true) {
             TableSchema schema =
-                    latest().orElseThrow(
-                                    () -> new RuntimeException("Table not 
exists: " + tableRoot));
+                    latest().orElseThrow(() -> new 
Catalog.TableNotExistException(identifier));
             Map<String, String> newOptions = new HashMap<>(schema.options());
             List<DataField> newFields = new ArrayList<>(schema.fields());
             AtomicInteger highestFieldId = new 
AtomicInteger(schema.highestFieldId());
@@ -204,10 +207,8 @@ public class SchemaManager implements Serializable {
                     AddColumn addColumn = (AddColumn) change;
                     SchemaChange.Move move = addColumn.move();
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "The column [%s] exists in the 
table[%s].",
-                                        addColumn.fieldName(), tableRoot));
+                        throw new Catalog.ColumnAlreadyExistException(
+                                identifier, addColumn.fieldName());
                     }
                     Preconditions.checkArgument(
                             addColumn.dataType().isNullable(),
@@ -241,10 +242,7 @@ public class SchemaManager implements Serializable {
                     RenameColumn rename = (RenameColumn) change;
                     validateNotPrimaryAndPartitionKey(schema, 
rename.fieldName());
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "The column [%s] exists in the 
table[%s].",
-                                        rename.newName(), tableRoot));
+                        throw new 
Catalog.ColumnAlreadyExistException(identifier, rename.newName());
                     }
 
                     updateNestedColumn(
@@ -262,10 +260,7 @@ public class SchemaManager implements Serializable {
                     validateNotPrimaryAndPartitionKey(schema, 
drop.fieldName());
                     if (!newFields.removeIf(
                             f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "The column [%s] doesn't exist in the 
table[%s].",
-                                        drop.fieldName(), tableRoot));
+                        throw new Catalog.ColumnNotExistException(identifier, 
drop.fieldName());
                     }
                     if (newFields.isEmpty()) {
                         throw new IllegalArgumentException("Cannot drop all 
fields in table");
@@ -275,8 +270,8 @@ public class SchemaManager implements Serializable {
                     if (schema.partitionKeys().contains(update.fieldName())) {
                         throw new IllegalArgumentException(
                                 String.format(
-                                        "Cannot update partition column [%s] 
type in the table[%s].",
-                                        update.fieldName(), tableRoot));
+                                        "Cannot update partition column [%s] 
type in the table [%s].",
+                                        update.fieldName(), 
identifier.getFullName()));
                     }
                     updateColumn(
                             newFields,
@@ -403,7 +398,8 @@ public class SchemaManager implements Serializable {
             List<DataField> newFields,
             String[] updateFieldNames,
             int index,
-            Function<DataField, DataField> updateFunc) {
+            Function<DataField, DataField> updateFunc)
+            throws Catalog.ColumnNotExistException {
         boolean found = false;
         for (int i = 0; i < newFields.size(); i++) {
             DataField field = newFields.get(i);
@@ -429,14 +425,16 @@ public class SchemaManager implements Serializable {
             }
         }
         if (!found) {
-            throw new RuntimeException("Can not find column: " + 
Arrays.asList(updateFieldNames));
+            throw new Catalog.ColumnNotExistException(
+                    identifier, Arrays.toString(updateFieldNames));
         }
     }
 
     private void updateColumn(
             List<DataField> newFields,
             String updateFieldName,
-            Function<DataField, DataField> updateFunc) {
+            Function<DataField, DataField> updateFunc)
+            throws Catalog.ColumnNotExistException {
         updateNestedColumn(newFields, new String[] {updateFieldName}, 0, 
updateFunc);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 321cbbc28..deb134dec 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -439,7 +439,7 @@ public abstract class CatalogTestBase {
                                         false))
                 .withMessage("Table test_db.non_existing_table does not 
exist.");
 
-        // Alter table adds a column throws Exception when column already 
exists
+        // Alter table adds a column throws ColumnAlreadyExistException when 
column already exists
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -447,8 +447,8 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 SchemaChange.addColumn("col1", 
DataTypes.INT())),
                                         false))
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("The column [col1] exists in the table");
+                
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
+                .hasRootCauseMessage("Column col1 already exists in the 
test_db.test_table table.");
     }
 
     @Test
@@ -476,7 +476,8 @@ public abstract class CatalogTestBase {
         assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
         assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
 
-        // Alter table renames a new column throws Exception when column 
already exists
+        // Alter table renames a new column throws ColumnAlreadyExistException 
when column already
+        // exists
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -484,10 +485,11 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.renameColumn("col1", "new_col1")),
                                         false))
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("The column [new_col1] exists in the 
table");
+                
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
+                .hasRootCauseMessage(
+                        "Column new_col1 already exists in the 
test_db.test_table table.");
 
-        // Alter table renames a column throws Exception when column does not 
exist
+        // Alter table renames a column throws ColumnNotExistException when 
column does not exist
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -496,7 +498,9 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.renameColumn(
                                                         "non_existing_col", 
"new_col2")),
                                         false))
-                .hasMessageContaining("Can not find column: 
[non_existing_col]");
+                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
+                .hasRootCauseMessage(
+                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
     }
 
     @Test
@@ -532,7 +536,7 @@ public abstract class CatalogTestBase {
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining(" Cannot drop all fields in table");
 
-        // Alter table drop a column throws Exception when column does not 
exist
+        // Alter table drop a column ColumnNotExistException when column does 
not exist
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -540,7 +544,9 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.dropColumn("non_existing_col")),
                                         false))
-                .hasMessageContaining("The column [non_existing_col] doesn't 
exist in the table");
+                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
+                .hasRootCauseMessage(
+                        "Column non_existing_col does not exist in the 
test_db.test_table table.");
     }
 
     @Test
@@ -583,7 +589,8 @@ public abstract class CatalogTestBase {
                 .hasMessageContaining(
                         "Column type col1[DOUBLE] cannot be converted to 
STRING without loosing information.");
 
-        // Alter table update a column type throws Exception when column does 
not exist
+        // Alter table update a column type throws ColumnNotExistException 
when column does not
+        // exist
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -592,7 +599,9 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.updateColumnType(
                                                         "non_existing_col", 
DataTypes.INT())),
                                         false))
-                .hasMessageContaining("Can not find column: 
[non_existing_col]");
+                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
+                .hasRootCauseMessage(
+                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
 
         // Alter table update a column type throws Exception when column is 
partition columns
         assertThatThrownBy(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index f2578c15c..ad6f514cc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.table;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -64,12 +66,14 @@ public class SchemaEvolutionTest {
     private Path tablePath;
     private SchemaManager schemaManager;
     private String commitUser;
+    private String tableFullName;
 
     @BeforeEach
     public void beforeEach() {
-        tablePath = new Path(tempDir.toUri());
+        tablePath = new Path(tempDir.toUri().toString(), "test_db/test_table");
         schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
         commitUser = UUID.randomUUID().toString();
+        tableFullName = Identifier.fromPath(tablePath).getFullName();
     }
 
     @Test
@@ -153,8 +157,8 @@ public class SchemaEvolutionTest {
                                         Collections.singletonList(
                                                 SchemaChange.addColumn(
                                                         columnName, new 
FloatType()))))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("The column [%s] exists in the table[%s].", 
columnName, tablePath);
+                .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
+                .hasMessage("Column %s already exists in the %s table.", 
columnName, tableFullName);
     }
 
     @Test
@@ -210,9 +214,10 @@ public class SchemaEvolutionTest {
                                 schemaManager.commitChanges(
                                         Collections.singletonList(
                                                 
SchemaChange.renameColumn("f0", "f1"))))
-                .isInstanceOf(IllegalArgumentException.class)
+                .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
                 .hasMessage(
-                        String.format("The column [%s] exists in the 
table[%s].", "f1", tablePath));
+                        String.format(
+                                "Column %s already exists in the %s table.", 
"f1", tableFullName));
     }
 
     @Test
@@ -266,11 +271,11 @@ public class SchemaEvolutionTest {
                         () ->
                                 schemaManager.commitChanges(
                                         
Collections.singletonList(SchemaChange.dropColumn("f100"))))
-                .isInstanceOf(IllegalArgumentException.class)
+                .isInstanceOf(Catalog.ColumnNotExistException.class)
                 .hasMessage(
                         String.format(
-                                "The column [%s] doesn't exist in the 
table[%s].",
-                                "f100", tablePath));
+                                "Column %s does not exist in the %s table.",
+                                "f100", tableFullName));
 
         assertThatThrownBy(
                         () ->
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
index d8ad51d38..4c94444f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -57,7 +58,7 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
                 schemaManager.commitChanges(schemaChange);
-            } catch (IllegalArgumentException e) {
+            } catch (Catalog.ColumnAlreadyExistException e) {
                 // This is normal. For example when a table is split into 
multiple database tables,
                 // all these tables will be added the same column. However 
schemaManager can't
                 // handle duplicated column adds, so we just catch the 
exception and log it.
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 45d6b34e5..7d132b4da 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
@@ -55,6 +56,8 @@ import java.util.concurrent.ThreadLocalRandom;
 public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
 
     @TempDir java.nio.file.Path tempDir;
+    private static final String DATABASE_NAME = "test";
+    private static final String TABLE_NAME = "test_tbl";
 
     @Test
     @Timeout(120)
@@ -69,16 +72,26 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         boolean enableFailure = random.nextBoolean();
 
         TestTable testTable =
-                new TestTable("test_tbl", numEvents, numSchemaChanges, 
numPartitions, numKeys);
+                new TestTable(TABLE_NAME, numEvents, numSchemaChanges, 
numPartitions, numKeys);
 
         Path tablePath;
         FileIO fileIO;
         String failingName = UUID.randomUUID().toString();
+
         if (enableFailure) {
-            tablePath = new Path(FailingFileIO.getFailingPath(failingName, 
tempDir.toString()));
+            tablePath =
+                    new Path(
+                            FailingFileIO.getFailingPath(
+                                    failingName,
+                                    CatalogUtils.stringifyPath(
+                                            tempDir.toString(), DATABASE_NAME, 
TABLE_NAME)));
             fileIO = new FailingFileIO();
         } else {
-            tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+            tablePath =
+                    new Path(
+                            TraceableFileIO.SCHEME + "://",
+                            CatalogUtils.stringifyPath(
+                                    tempDir.toString(), DATABASE_NAME, 
TABLE_NAME));
             fileIO = LocalFileIO.create();
         }
 

Reply via email to