This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b9e6df625 [hotfix] add column 'already exists' and 'does not exist'
exception type (#902)
b9e6df625 is described below
commit b9e6df6259c9815b5f1a08809897067859e0caca
Author: Kerwin <[email protected]>
AuthorDate: Mon Apr 17 11:46:48 2023 +0800
[hotfix] add column 'already exists' and 'does not exist' exception type
(#902)
---
.../java/org/apache/paimon/catalog/Catalog.java | 54 ++++++++++++++++++++++
.../org/apache/paimon/schema/SchemaManager.java | 36 +++++++--------
.../org/apache/paimon/catalog/CatalogTestBase.java | 33 ++++++++-----
.../apache/paimon/table/SchemaEvolutionTest.java | 21 +++++----
.../sink/cdc/SchemaChangeProcessFunction.java | 3 +-
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 19 ++++++--
6 files changed, 123 insertions(+), 43 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 45a828b79..c8d7931db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -283,4 +283,58 @@ public interface Catalog extends AutoCloseable {
return identifier;
}
}
+
+ /** Exception for trying to alter a column that already exists. */
+ class ColumnAlreadyExistException extends Exception {
+
+ private static final String MSG = "Column %s already exists in the %s
table.";
+
+ private final Identifier identifier;
+ private final String column;
+
+ public ColumnAlreadyExistException(Identifier identifier, String
column) {
+ this(identifier, column, null);
+ }
+
+ public ColumnAlreadyExistException(Identifier identifier, String
column, Throwable cause) {
+ super(String.format(MSG, column, identifier.getFullName()), cause);
+ this.identifier = identifier;
+ this.column = column;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public String column() {
+ return column;
+ }
+ }
+
+ /** Exception for trying to operate on a table that doesn't exist. */
+ class ColumnNotExistException extends Exception {
+
+ private static final String MSG = "Column %s does not exist in the %s
table.";
+
+ private final Identifier identifier;
+ private final String column;
+
+ public ColumnNotExistException(Identifier identifier, String column) {
+ this(identifier, column, null);
+ }
+
+ public ColumnNotExistException(Identifier identifier, String column,
Throwable cause) {
+ super(String.format(MSG, column, identifier.getFullName()), cause);
+ this.identifier = identifier;
+ this.column = column;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public String column() {
+ return column;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 9eb6d349e..a9213668b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -21,6 +21,8 @@ package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
@@ -68,12 +70,14 @@ public class SchemaManager implements Serializable {
private final FileIO fileIO;
private final Path tableRoot;
+ private final Identifier identifier;
@Nullable private transient Lock lock;
public SchemaManager(FileIO fileIO, Path tableRoot) {
this.fileIO = fileIO;
this.tableRoot = tableRoot;
+ this.identifier = Identifier.fromPath(tableRoot);
}
public SchemaManager withLock(@Nullable Lock lock) {
@@ -186,8 +190,7 @@ public class SchemaManager implements Serializable {
public TableSchema commitChanges(List<SchemaChange> changes) throws
Exception {
while (true) {
TableSchema schema =
- latest().orElseThrow(
- () -> new RuntimeException("Table not
exists: " + tableRoot));
+ latest().orElseThrow(() -> new
Catalog.TableNotExistException(identifier));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(schema.highestFieldId());
@@ -204,10 +207,8 @@ public class SchemaManager implements Serializable {
AddColumn addColumn = (AddColumn) change;
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f ->
f.name().equals(addColumn.fieldName()))) {
- throw new IllegalArgumentException(
- String.format(
- "The column [%s] exists in the
table[%s].",
- addColumn.fieldName(), tableRoot));
+ throw new Catalog.ColumnAlreadyExistException(
+ identifier, addColumn.fieldName());
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
@@ -241,10 +242,7 @@ public class SchemaManager implements Serializable {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema,
rename.fieldName());
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
- throw new IllegalArgumentException(
- String.format(
- "The column [%s] exists in the
table[%s].",
- rename.newName(), tableRoot));
+ throw new
Catalog.ColumnAlreadyExistException(identifier, rename.newName());
}
updateNestedColumn(
@@ -262,10 +260,7 @@ public class SchemaManager implements Serializable {
validateNotPrimaryAndPartitionKey(schema,
drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn)
change).fieldName()))) {
- throw new IllegalArgumentException(
- String.format(
- "The column [%s] doesn't exist in the
table[%s].",
- drop.fieldName(), tableRoot));
+ throw new Catalog.ColumnNotExistException(identifier,
drop.fieldName());
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all
fields in table");
@@ -275,8 +270,8 @@ public class SchemaManager implements Serializable {
if (schema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
- "Cannot update partition column [%s]
type in the table[%s].",
- update.fieldName(), tableRoot));
+ "Cannot update partition column [%s]
type in the table [%s].",
+ update.fieldName(),
identifier.getFullName()));
}
updateColumn(
newFields,
@@ -403,7 +398,8 @@ public class SchemaManager implements Serializable {
List<DataField> newFields,
String[] updateFieldNames,
int index,
- Function<DataField, DataField> updateFunc) {
+ Function<DataField, DataField> updateFunc)
+ throws Catalog.ColumnNotExistException {
boolean found = false;
for (int i = 0; i < newFields.size(); i++) {
DataField field = newFields.get(i);
@@ -429,14 +425,16 @@ public class SchemaManager implements Serializable {
}
}
if (!found) {
- throw new RuntimeException("Can not find column: " +
Arrays.asList(updateFieldNames));
+ throw new Catalog.ColumnNotExistException(
+ identifier, Arrays.toString(updateFieldNames));
}
}
private void updateColumn(
List<DataField> newFields,
String updateFieldName,
- Function<DataField, DataField> updateFunc) {
+ Function<DataField, DataField> updateFunc)
+ throws Catalog.ColumnNotExistException {
updateNestedColumn(newFields, new String[] {updateFieldName}, 0,
updateFunc);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 321cbbc28..deb134dec 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -439,7 +439,7 @@ public abstract class CatalogTestBase {
false))
.withMessage("Table test_db.non_existing_table does not
exist.");
- // Alter table adds a column throws Exception when column already
exists
+ // Alter table adds a column throws ColumnAlreadyExistException when
column already exists
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -447,8 +447,8 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.addColumn("col1",
DataTypes.INT())),
false))
- .hasRootCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("The column [col1] exists in the table");
+
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
+ .hasRootCauseMessage("Column col1 already exists in the
test_db.test_table table.");
}
@Test
@@ -476,7 +476,8 @@ public abstract class CatalogTestBase {
assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
- // Alter table renames a new column throws Exception when column
already exists
+ // Alter table renames a new column throws ColumnAlreadyExistException
when column already
+ // exists
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -484,10 +485,11 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.renameColumn("col1", "new_col1")),
false))
- .hasRootCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("The column [new_col1] exists in the
table");
+
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
+ .hasRootCauseMessage(
+ "Column new_col1 already exists in the
test_db.test_table table.");
- // Alter table renames a column throws Exception when column does not
exist
+ // Alter table renames a column throws ColumnNotExistException when
column does not exist
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -496,7 +498,9 @@ public abstract class CatalogTestBase {
SchemaChange.renameColumn(
"non_existing_col",
"new_col2")),
false))
- .hasMessageContaining("Can not find column:
[non_existing_col]");
+ .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
+ .hasRootCauseMessage(
+ "Column [non_existing_col] does not exist in the
test_db.test_table table.");
}
@Test
@@ -532,7 +536,7 @@ public abstract class CatalogTestBase {
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(" Cannot drop all fields in table");
- // Alter table drop a column throws Exception when column does not
exist
+ // Alter table drop a column ColumnNotExistException when column does
not exist
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -540,7 +544,9 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.dropColumn("non_existing_col")),
false))
- .hasMessageContaining("The column [non_existing_col] doesn't
exist in the table");
+ .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
+ .hasRootCauseMessage(
+ "Column non_existing_col does not exist in the
test_db.test_table table.");
}
@Test
@@ -583,7 +589,8 @@ public abstract class CatalogTestBase {
.hasMessageContaining(
"Column type col1[DOUBLE] cannot be converted to
STRING without loosing information.");
- // Alter table update a column type throws Exception when column does
not exist
+ // Alter table update a column type throws ColumnNotExistException
when column does not
+ // exist
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -592,7 +599,9 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnType(
"non_existing_col",
DataTypes.INT())),
false))
- .hasMessageContaining("Can not find column:
[non_existing_col]");
+ .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
+ .hasRootCauseMessage(
+ "Column [non_existing_col] does not exist in the
test_db.test_table table.");
// Alter table update a column type throws Exception when column is
partition columns
assertThatThrownBy(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index f2578c15c..ad6f514cc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -18,6 +18,8 @@
package org.apache.paimon.table;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -64,12 +66,14 @@ public class SchemaEvolutionTest {
private Path tablePath;
private SchemaManager schemaManager;
private String commitUser;
+ private String tableFullName;
@BeforeEach
public void beforeEach() {
- tablePath = new Path(tempDir.toUri());
+ tablePath = new Path(tempDir.toUri().toString(), "test_db/test_table");
schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
commitUser = UUID.randomUUID().toString();
+ tableFullName = Identifier.fromPath(tablePath).getFullName();
}
@Test
@@ -153,8 +157,8 @@ public class SchemaEvolutionTest {
Collections.singletonList(
SchemaChange.addColumn(
columnName, new
FloatType()))))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("The column [%s] exists in the table[%s].",
columnName, tablePath);
+ .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
+ .hasMessage("Column %s already exists in the %s table.",
columnName, tableFullName);
}
@Test
@@ -210,9 +214,10 @@ public class SchemaEvolutionTest {
schemaManager.commitChanges(
Collections.singletonList(
SchemaChange.renameColumn("f0", "f1"))))
- .isInstanceOf(IllegalArgumentException.class)
+ .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
.hasMessage(
- String.format("The column [%s] exists in the
table[%s].", "f1", tablePath));
+ String.format(
+ "Column %s already exists in the %s table.",
"f1", tableFullName));
}
@Test
@@ -266,11 +271,11 @@ public class SchemaEvolutionTest {
() ->
schemaManager.commitChanges(
Collections.singletonList(SchemaChange.dropColumn("f100"))))
- .isInstanceOf(IllegalArgumentException.class)
+ .isInstanceOf(Catalog.ColumnNotExistException.class)
.hasMessage(
String.format(
- "The column [%s] doesn't exist in the
table[%s].",
- "f100", tablePath));
+ "Column %s does not exist in the %s table.",
+ "f100", tableFullName));
assertThatThrownBy(
() ->
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
index d8ad51d38..4c94444f0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -57,7 +58,7 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
schemaManager.commitChanges(schemaChange);
- } catch (IllegalArgumentException e) {
+ } catch (Catalog.ColumnAlreadyExistException e) {
// This is normal. For example when a table is split into
multiple database tables,
// all these tables will be added the same column. However
schemaManager can't
// handle duplicated column adds, so we just catch the
exception and log it.
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 45d6b34e5..7d132b4da 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
@@ -55,6 +56,8 @@ import java.util.concurrent.ThreadLocalRandom;
public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
@TempDir java.nio.file.Path tempDir;
+ private static final String DATABASE_NAME = "test";
+ private static final String TABLE_NAME = "test_tbl";
@Test
@Timeout(120)
@@ -69,16 +72,26 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
boolean enableFailure = random.nextBoolean();
TestTable testTable =
- new TestTable("test_tbl", numEvents, numSchemaChanges,
numPartitions, numKeys);
+ new TestTable(TABLE_NAME, numEvents, numSchemaChanges,
numPartitions, numKeys);
Path tablePath;
FileIO fileIO;
String failingName = UUID.randomUUID().toString();
+
if (enableFailure) {
- tablePath = new Path(FailingFileIO.getFailingPath(failingName,
tempDir.toString()));
+ tablePath =
+ new Path(
+ FailingFileIO.getFailingPath(
+ failingName,
+ CatalogUtils.stringifyPath(
+ tempDir.toString(), DATABASE_NAME,
TABLE_NAME)));
fileIO = new FailingFileIO();
} else {
- tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ tablePath =
+ new Path(
+ TraceableFileIO.SCHEME + "://",
+ CatalogUtils.stringifyPath(
+ tempDir.toString(), DATABASE_NAME,
TABLE_NAME));
fileIO = LocalFileIO.create();
}