This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a0a3c652d Revert "[hotfix] add column 'already exists' and 'does not 
exist' exception type (#902)"
a0a3c652d is described below

commit a0a3c652df620eb5aeb79f208eb74762fdb4e90e
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 17 20:37:49 2023 +0800

    Revert "[hotfix] add column 'already exists' and 'does not exist' exception 
type (#902)"
    
    This reverts commit b9e6df62
---
 .../java/org/apache/paimon/catalog/Catalog.java    | 54 ----------------------
 .../org/apache/paimon/schema/SchemaManager.java    | 36 ++++++++-------
 .../org/apache/paimon/catalog/CatalogTestBase.java | 33 +++++--------
 .../apache/paimon/table/SchemaEvolutionTest.java   | 21 ++++-----
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  3 +-
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      | 19 ++------
 6 files changed, 43 insertions(+), 123 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c8d7931db..45a828b79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -283,58 +283,4 @@ public interface Catalog extends AutoCloseable {
             return identifier;
         }
     }
-
-    /** Exception for trying to alter a column that already exists. */
-    class ColumnAlreadyExistException extends Exception {
-
-        private static final String MSG = "Column %s already exists in the %s 
table.";
-
-        private final Identifier identifier;
-        private final String column;
-
-        public ColumnAlreadyExistException(Identifier identifier, String 
column) {
-            this(identifier, column, null);
-        }
-
-        public ColumnAlreadyExistException(Identifier identifier, String 
column, Throwable cause) {
-            super(String.format(MSG, column, identifier.getFullName()), cause);
-            this.identifier = identifier;
-            this.column = column;
-        }
-
-        public Identifier identifier() {
-            return identifier;
-        }
-
-        public String column() {
-            return column;
-        }
-    }
-
-    /** Exception for trying to operate on a table that doesn't exist. */
-    class ColumnNotExistException extends Exception {
-
-        private static final String MSG = "Column %s does not exist in the %s 
table.";
-
-        private final Identifier identifier;
-        private final String column;
-
-        public ColumnNotExistException(Identifier identifier, String column) {
-            this(identifier, column, null);
-        }
-
-        public ColumnNotExistException(Identifier identifier, String column, 
Throwable cause) {
-            super(String.format(MSG, column, identifier.getFullName()), cause);
-            this.identifier = identifier;
-            this.column = column;
-        }
-
-        public Identifier identifier() {
-            return identifier;
-        }
-
-        public String column() {
-            return column;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a9213668b..9eb6d349e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -21,8 +21,6 @@ package org.apache.paimon.schema;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
@@ -70,14 +68,12 @@ public class SchemaManager implements Serializable {
 
     private final FileIO fileIO;
     private final Path tableRoot;
-    private final Identifier identifier;
 
     @Nullable private transient Lock lock;
 
     public SchemaManager(FileIO fileIO, Path tableRoot) {
         this.fileIO = fileIO;
         this.tableRoot = tableRoot;
-        this.identifier = Identifier.fromPath(tableRoot);
     }
 
     public SchemaManager withLock(@Nullable Lock lock) {
@@ -190,7 +186,8 @@ public class SchemaManager implements Serializable {
     public TableSchema commitChanges(List<SchemaChange> changes) throws 
Exception {
         while (true) {
             TableSchema schema =
-                    latest().orElseThrow(() -> new 
Catalog.TableNotExistException(identifier));
+                    latest().orElseThrow(
+                                    () -> new RuntimeException("Table not 
exists: " + tableRoot));
             Map<String, String> newOptions = new HashMap<>(schema.options());
             List<DataField> newFields = new ArrayList<>(schema.fields());
             AtomicInteger highestFieldId = new 
AtomicInteger(schema.highestFieldId());
@@ -207,8 +204,10 @@ public class SchemaManager implements Serializable {
                     AddColumn addColumn = (AddColumn) change;
                     SchemaChange.Move move = addColumn.move();
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
-                        throw new Catalog.ColumnAlreadyExistException(
-                                identifier, addColumn.fieldName());
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] exists in the 
table[%s].",
+                                        addColumn.fieldName(), tableRoot));
                     }
                     Preconditions.checkArgument(
                             addColumn.dataType().isNullable(),
@@ -242,7 +241,10 @@ public class SchemaManager implements Serializable {
                     RenameColumn rename = (RenameColumn) change;
                     validateNotPrimaryAndPartitionKey(schema, 
rename.fieldName());
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
-                        throw new 
Catalog.ColumnAlreadyExistException(identifier, rename.newName());
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] exists in the 
table[%s].",
+                                        rename.newName(), tableRoot));
                     }
 
                     updateNestedColumn(
@@ -260,7 +262,10 @@ public class SchemaManager implements Serializable {
                     validateNotPrimaryAndPartitionKey(schema, 
drop.fieldName());
                     if (!newFields.removeIf(
                             f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
-                        throw new Catalog.ColumnNotExistException(identifier, 
drop.fieldName());
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] doesn't exist in the 
table[%s].",
+                                        drop.fieldName(), tableRoot));
                     }
                     if (newFields.isEmpty()) {
                         throw new IllegalArgumentException("Cannot drop all 
fields in table");
@@ -270,8 +275,8 @@ public class SchemaManager implements Serializable {
                     if (schema.partitionKeys().contains(update.fieldName())) {
                         throw new IllegalArgumentException(
                                 String.format(
-                                        "Cannot update partition column [%s] 
type in the table [%s].",
-                                        update.fieldName(), 
identifier.getFullName()));
+                                        "Cannot update partition column [%s] 
type in the table[%s].",
+                                        update.fieldName(), tableRoot));
                     }
                     updateColumn(
                             newFields,
@@ -398,8 +403,7 @@ public class SchemaManager implements Serializable {
             List<DataField> newFields,
             String[] updateFieldNames,
             int index,
-            Function<DataField, DataField> updateFunc)
-            throws Catalog.ColumnNotExistException {
+            Function<DataField, DataField> updateFunc) {
         boolean found = false;
         for (int i = 0; i < newFields.size(); i++) {
             DataField field = newFields.get(i);
@@ -425,16 +429,14 @@ public class SchemaManager implements Serializable {
             }
         }
         if (!found) {
-            throw new Catalog.ColumnNotExistException(
-                    identifier, Arrays.toString(updateFieldNames));
+            throw new RuntimeException("Can not find column: " + 
Arrays.asList(updateFieldNames));
         }
     }
 
     private void updateColumn(
             List<DataField> newFields,
             String updateFieldName,
-            Function<DataField, DataField> updateFunc)
-            throws Catalog.ColumnNotExistException {
+            Function<DataField, DataField> updateFunc) {
         updateNestedColumn(newFields, new String[] {updateFieldName}, 0, 
updateFunc);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index deb134dec..321cbbc28 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -439,7 +439,7 @@ public abstract class CatalogTestBase {
                                         false))
                 .withMessage("Table test_db.non_existing_table does not 
exist.");
 
-        // Alter table adds a column throws ColumnAlreadyExistException when 
column already exists
+        // Alter table adds a column throws Exception when column already 
exists
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -447,8 +447,8 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 SchemaChange.addColumn("col1", 
DataTypes.INT())),
                                         false))
-                
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
-                .hasRootCauseMessage("Column col1 already exists in the 
test_db.test_table table.");
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("The column [col1] exists in the table");
     }
 
     @Test
@@ -476,8 +476,7 @@ public abstract class CatalogTestBase {
         assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
         assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
 
-        // Alter table renames a new column throws ColumnAlreadyExistException 
when column already
-        // exists
+        // Alter table renames a new column throws Exception when column 
already exists
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -485,11 +484,10 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.renameColumn("col1", "new_col1")),
                                         false))
-                
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
-                .hasRootCauseMessage(
-                        "Column new_col1 already exists in the 
test_db.test_table table.");
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("The column [new_col1] exists in the 
table");
 
-        // Alter table renames a column throws ColumnNotExistException when 
column does not exist
+        // Alter table renames a column throws Exception when column does not 
exist
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -498,9 +496,7 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.renameColumn(
                                                         "non_existing_col", 
"new_col2")),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasRootCauseMessage(
-                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
+                .hasMessageContaining("Can not find column: 
[non_existing_col]");
     }
 
     @Test
@@ -536,7 +532,7 @@ public abstract class CatalogTestBase {
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining(" Cannot drop all fields in table");
 
-        // Alter table drop a column ColumnNotExistException when column does 
not exist
+        // Alter table drop a column throws Exception when column does not 
exist
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -544,9 +540,7 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.dropColumn("non_existing_col")),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasRootCauseMessage(
-                        "Column non_existing_col does not exist in the 
test_db.test_table table.");
+                .hasMessageContaining("The column [non_existing_col] doesn't 
exist in the table");
     }
 
     @Test
@@ -589,8 +583,7 @@ public abstract class CatalogTestBase {
                 .hasMessageContaining(
                         "Column type col1[DOUBLE] cannot be converted to 
STRING without loosing information.");
 
-        // Alter table update a column type throws ColumnNotExistException 
when column does not
-        // exist
+        // Alter table update a column type throws Exception when column does 
not exist
         assertThatThrownBy(
                         () ->
                                 catalog.alterTable(
@@ -599,9 +592,7 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.updateColumnType(
                                                         "non_existing_col", 
DataTypes.INT())),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasRootCauseMessage(
-                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
+                .hasMessageContaining("Can not find column: 
[non_existing_col]");
 
         // Alter table update a column type throws Exception when column is 
partition columns
         assertThatThrownBy(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index ad6f514cc..f2578c15c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.table;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -66,14 +64,12 @@ public class SchemaEvolutionTest {
     private Path tablePath;
     private SchemaManager schemaManager;
     private String commitUser;
-    private String tableFullName;
 
     @BeforeEach
     public void beforeEach() {
-        tablePath = new Path(tempDir.toUri().toString(), "test_db/test_table");
+        tablePath = new Path(tempDir.toUri());
         schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
         commitUser = UUID.randomUUID().toString();
-        tableFullName = Identifier.fromPath(tablePath).getFullName();
     }
 
     @Test
@@ -157,8 +153,8 @@ public class SchemaEvolutionTest {
                                         Collections.singletonList(
                                                 SchemaChange.addColumn(
                                                         columnName, new 
FloatType()))))
-                .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
-                .hasMessage("Column %s already exists in the %s table.", 
columnName, tableFullName);
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("The column [%s] exists in the table[%s].", 
columnName, tablePath);
     }
 
     @Test
@@ -214,10 +210,9 @@ public class SchemaEvolutionTest {
                                 schemaManager.commitChanges(
                                         Collections.singletonList(
                                                 
SchemaChange.renameColumn("f0", "f1"))))
-                .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
+                .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
-                        String.format(
-                                "Column %s already exists in the %s table.", 
"f1", tableFullName));
+                        String.format("The column [%s] exists in the 
table[%s].", "f1", tablePath));
     }
 
     @Test
@@ -271,11 +266,11 @@ public class SchemaEvolutionTest {
                         () ->
                                 schemaManager.commitChanges(
                                         
Collections.singletonList(SchemaChange.dropColumn("f100"))))
-                .isInstanceOf(Catalog.ColumnNotExistException.class)
+                .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
                         String.format(
-                                "Column %s does not exist in the %s table.",
-                                "f100", tableFullName));
+                                "The column [%s] doesn't exist in the 
table[%s].",
+                                "f100", tablePath));
 
         assertThatThrownBy(
                         () ->
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index cce1ee7e1..44f748dfe 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -108,7 +107,7 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
                 schemaManager.commitChanges(schemaChange);
-            } catch (Catalog.ColumnAlreadyExistException e) {
+            } catch (IllegalArgumentException e) {
                 // This is normal. For example when a table is split into 
multiple database tables,
                 // all these tables will be added the same column. However 
schemaManager can't
                 // handle duplicated column adds, so we just catch the 
exception and log it.
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 7d132b4da..45d6b34e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
@@ -56,8 +55,6 @@ import java.util.concurrent.ThreadLocalRandom;
 public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
 
     @TempDir java.nio.file.Path tempDir;
-    private static final String DATABASE_NAME = "test";
-    private static final String TABLE_NAME = "test_tbl";
 
     @Test
     @Timeout(120)
@@ -72,26 +69,16 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         boolean enableFailure = random.nextBoolean();
 
         TestTable testTable =
-                new TestTable(TABLE_NAME, numEvents, numSchemaChanges, 
numPartitions, numKeys);
+                new TestTable("test_tbl", numEvents, numSchemaChanges, 
numPartitions, numKeys);
 
         Path tablePath;
         FileIO fileIO;
         String failingName = UUID.randomUUID().toString();
-
         if (enableFailure) {
-            tablePath =
-                    new Path(
-                            FailingFileIO.getFailingPath(
-                                    failingName,
-                                    CatalogUtils.stringifyPath(
-                                            tempDir.toString(), DATABASE_NAME, 
TABLE_NAME)));
+            tablePath = new Path(FailingFileIO.getFailingPath(failingName, 
tempDir.toString()));
             fileIO = new FailingFileIO();
         } else {
-            tablePath =
-                    new Path(
-                            TraceableFileIO.SCHEME + "://",
-                            CatalogUtils.stringifyPath(
-                                    tempDir.toString(), DATABASE_NAME, 
TABLE_NAME));
+            tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
             fileIO = LocalFileIO.create();
         }
 

Reply via email to