This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a0a3c652d Revert "[hotfix] add column 'already exists' and 'does not
exist' exception type (#902)"
a0a3c652d is described below
commit a0a3c652df620eb5aeb79f208eb74762fdb4e90e
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 17 20:37:49 2023 +0800
Revert "[hotfix] add column 'already exists' and 'does not exist' exception
type (#902)"
This reverts commit b9e6df62
---
.../java/org/apache/paimon/catalog/Catalog.java | 54 ----------------------
.../org/apache/paimon/schema/SchemaManager.java | 36 ++++++++-------
.../org/apache/paimon/catalog/CatalogTestBase.java | 33 +++++--------
.../apache/paimon/table/SchemaEvolutionTest.java | 21 ++++-----
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 3 +-
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 19 ++------
6 files changed, 43 insertions(+), 123 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c8d7931db..45a828b79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -283,58 +283,4 @@ public interface Catalog extends AutoCloseable {
return identifier;
}
}
-
- /** Exception for trying to alter a column that already exists. */
- class ColumnAlreadyExistException extends Exception {
-
- private static final String MSG = "Column %s already exists in the %s
table.";
-
- private final Identifier identifier;
- private final String column;
-
- public ColumnAlreadyExistException(Identifier identifier, String
column) {
- this(identifier, column, null);
- }
-
- public ColumnAlreadyExistException(Identifier identifier, String
column, Throwable cause) {
- super(String.format(MSG, column, identifier.getFullName()), cause);
- this.identifier = identifier;
- this.column = column;
- }
-
- public Identifier identifier() {
- return identifier;
- }
-
- public String column() {
- return column;
- }
- }
-
- /** Exception for trying to operate on a table that doesn't exist. */
- class ColumnNotExistException extends Exception {
-
- private static final String MSG = "Column %s does not exist in the %s
table.";
-
- private final Identifier identifier;
- private final String column;
-
- public ColumnNotExistException(Identifier identifier, String column) {
- this(identifier, column, null);
- }
-
- public ColumnNotExistException(Identifier identifier, String column,
Throwable cause) {
- super(String.format(MSG, column, identifier.getFullName()), cause);
- this.identifier = identifier;
- this.column = column;
- }
-
- public Identifier identifier() {
- return identifier;
- }
-
- public String column() {
- return column;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a9213668b..9eb6d349e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -21,8 +21,6 @@ package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
@@ -70,14 +68,12 @@ public class SchemaManager implements Serializable {
private final FileIO fileIO;
private final Path tableRoot;
- private final Identifier identifier;
@Nullable private transient Lock lock;
public SchemaManager(FileIO fileIO, Path tableRoot) {
this.fileIO = fileIO;
this.tableRoot = tableRoot;
- this.identifier = Identifier.fromPath(tableRoot);
}
public SchemaManager withLock(@Nullable Lock lock) {
@@ -190,7 +186,8 @@ public class SchemaManager implements Serializable {
public TableSchema commitChanges(List<SchemaChange> changes) throws
Exception {
while (true) {
TableSchema schema =
- latest().orElseThrow(() -> new
Catalog.TableNotExistException(identifier));
+ latest().orElseThrow(
+ () -> new RuntimeException("Table not
exists: " + tableRoot));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(schema.highestFieldId());
@@ -207,8 +204,10 @@ public class SchemaManager implements Serializable {
AddColumn addColumn = (AddColumn) change;
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f ->
f.name().equals(addColumn.fieldName()))) {
- throw new Catalog.ColumnAlreadyExistException(
- identifier, addColumn.fieldName());
+ throw new IllegalArgumentException(
+ String.format(
+ "The column [%s] exists in the
table[%s].",
+ addColumn.fieldName(), tableRoot));
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
@@ -242,7 +241,10 @@ public class SchemaManager implements Serializable {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema,
rename.fieldName());
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
- throw new
Catalog.ColumnAlreadyExistException(identifier, rename.newName());
+ throw new IllegalArgumentException(
+ String.format(
+ "The column [%s] exists in the
table[%s].",
+ rename.newName(), tableRoot));
}
updateNestedColumn(
@@ -260,7 +262,10 @@ public class SchemaManager implements Serializable {
validateNotPrimaryAndPartitionKey(schema,
drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn)
change).fieldName()))) {
- throw new Catalog.ColumnNotExistException(identifier,
drop.fieldName());
+ throw new IllegalArgumentException(
+ String.format(
+ "The column [%s] doesn't exist in the
table[%s].",
+ drop.fieldName(), tableRoot));
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all
fields in table");
@@ -270,8 +275,8 @@ public class SchemaManager implements Serializable {
if (schema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
- "Cannot update partition column [%s]
type in the table [%s].",
- update.fieldName(),
identifier.getFullName()));
+ "Cannot update partition column [%s]
type in the table[%s].",
+ update.fieldName(), tableRoot));
}
updateColumn(
newFields,
@@ -398,8 +403,7 @@ public class SchemaManager implements Serializable {
List<DataField> newFields,
String[] updateFieldNames,
int index,
- Function<DataField, DataField> updateFunc)
- throws Catalog.ColumnNotExistException {
+ Function<DataField, DataField> updateFunc) {
boolean found = false;
for (int i = 0; i < newFields.size(); i++) {
DataField field = newFields.get(i);
@@ -425,16 +429,14 @@ public class SchemaManager implements Serializable {
}
}
if (!found) {
- throw new Catalog.ColumnNotExistException(
- identifier, Arrays.toString(updateFieldNames));
+ throw new RuntimeException("Can not find column: " +
Arrays.asList(updateFieldNames));
}
}
private void updateColumn(
List<DataField> newFields,
String updateFieldName,
- Function<DataField, DataField> updateFunc)
- throws Catalog.ColumnNotExistException {
+ Function<DataField, DataField> updateFunc) {
updateNestedColumn(newFields, new String[] {updateFieldName}, 0,
updateFunc);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index deb134dec..321cbbc28 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -439,7 +439,7 @@ public abstract class CatalogTestBase {
false))
.withMessage("Table test_db.non_existing_table does not
exist.");
- // Alter table adds a column throws ColumnAlreadyExistException when
column already exists
+ // Alter table adds a column throws Exception when column already
exists
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -447,8 +447,8 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.addColumn("col1",
DataTypes.INT())),
false))
-
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
- .hasRootCauseMessage("Column col1 already exists in the
test_db.test_table table.");
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("The column [col1] exists in the table");
}
@Test
@@ -476,8 +476,7 @@ public abstract class CatalogTestBase {
assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
- // Alter table renames a new column throws ColumnAlreadyExistException
when column already
- // exists
+ // Alter table renames a new column throws Exception when column
already exists
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -485,11 +484,10 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.renameColumn("col1", "new_col1")),
false))
-
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
- .hasRootCauseMessage(
- "Column new_col1 already exists in the
test_db.test_table table.");
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("The column [new_col1] exists in the
table");
- // Alter table renames a column throws ColumnNotExistException when
column does not exist
+ // Alter table renames a column throws Exception when column does not
exist
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -498,9 +496,7 @@ public abstract class CatalogTestBase {
SchemaChange.renameColumn(
"non_existing_col",
"new_col2")),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasRootCauseMessage(
- "Column [non_existing_col] does not exist in the
test_db.test_table table.");
+ .hasMessageContaining("Can not find column:
[non_existing_col]");
}
@Test
@@ -536,7 +532,7 @@ public abstract class CatalogTestBase {
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(" Cannot drop all fields in table");
- // Alter table drop a column ColumnNotExistException when column does
not exist
+ // Alter table drop a column throws Exception when column does not
exist
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -544,9 +540,7 @@ public abstract class CatalogTestBase {
Lists.newArrayList(
SchemaChange.dropColumn("non_existing_col")),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasRootCauseMessage(
- "Column non_existing_col does not exist in the
test_db.test_table table.");
+ .hasMessageContaining("The column [non_existing_col] doesn't
exist in the table");
}
@Test
@@ -589,8 +583,7 @@ public abstract class CatalogTestBase {
.hasMessageContaining(
"Column type col1[DOUBLE] cannot be converted to
STRING without loosing information.");
- // Alter table update a column type throws ColumnNotExistException
when column does not
- // exist
+ // Alter table update a column type throws Exception when column does
not exist
assertThatThrownBy(
() ->
catalog.alterTable(
@@ -599,9 +592,7 @@ public abstract class CatalogTestBase {
SchemaChange.updateColumnType(
"non_existing_col",
DataTypes.INT())),
false))
- .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
- .hasRootCauseMessage(
- "Column [non_existing_col] does not exist in the
test_db.test_table table.");
+ .hasMessageContaining("Can not find column:
[non_existing_col]");
// Alter table update a column type throws Exception when column is
partition columns
assertThatThrownBy(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index ad6f514cc..f2578c15c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -18,8 +18,6 @@
package org.apache.paimon.table;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -66,14 +64,12 @@ public class SchemaEvolutionTest {
private Path tablePath;
private SchemaManager schemaManager;
private String commitUser;
- private String tableFullName;
@BeforeEach
public void beforeEach() {
- tablePath = new Path(tempDir.toUri().toString(), "test_db/test_table");
+ tablePath = new Path(tempDir.toUri());
schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
commitUser = UUID.randomUUID().toString();
- tableFullName = Identifier.fromPath(tablePath).getFullName();
}
@Test
@@ -157,8 +153,8 @@ public class SchemaEvolutionTest {
Collections.singletonList(
SchemaChange.addColumn(
columnName, new
FloatType()))))
- .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
- .hasMessage("Column %s already exists in the %s table.",
columnName, tableFullName);
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("The column [%s] exists in the table[%s].",
columnName, tablePath);
}
@Test
@@ -214,10 +210,9 @@ public class SchemaEvolutionTest {
schemaManager.commitChanges(
Collections.singletonList(
SchemaChange.renameColumn("f0", "f1"))))
- .isInstanceOf(Catalog.ColumnAlreadyExistException.class)
+ .isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- String.format(
- "Column %s already exists in the %s table.",
"f1", tableFullName));
+ String.format("The column [%s] exists in the
table[%s].", "f1", tablePath));
}
@Test
@@ -271,11 +266,11 @@ public class SchemaEvolutionTest {
() ->
schemaManager.commitChanges(
Collections.singletonList(SchemaChange.dropColumn("f100"))))
- .isInstanceOf(Catalog.ColumnNotExistException.class)
+ .isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
- "Column %s does not exist in the %s table.",
- "f100", tableFullName));
+ "The column [%s] doesn't exist in the
table[%s].",
+ "f100", tablePath));
assertThatThrownBy(
() ->
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index cce1ee7e1..44f748dfe 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -108,7 +107,7 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
schemaManager.commitChanges(schemaChange);
- } catch (Catalog.ColumnAlreadyExistException e) {
+ } catch (IllegalArgumentException e) {
// This is normal. For example when a table is split into
multiple database tables,
// all these tables will be added the same column. However
schemaManager can't
// handle duplicated column adds, so we just catch the
exception and log it.
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 7d132b4da..45d6b34e5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
@@ -56,8 +55,6 @@ import java.util.concurrent.ThreadLocalRandom;
public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
@TempDir java.nio.file.Path tempDir;
- private static final String DATABASE_NAME = "test";
- private static final String TABLE_NAME = "test_tbl";
@Test
@Timeout(120)
@@ -72,26 +69,16 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
boolean enableFailure = random.nextBoolean();
TestTable testTable =
- new TestTable(TABLE_NAME, numEvents, numSchemaChanges,
numPartitions, numKeys);
+ new TestTable("test_tbl", numEvents, numSchemaChanges,
numPartitions, numKeys);
Path tablePath;
FileIO fileIO;
String failingName = UUID.randomUUID().toString();
-
if (enableFailure) {
- tablePath =
- new Path(
- FailingFileIO.getFailingPath(
- failingName,
- CatalogUtils.stringifyPath(
- tempDir.toString(), DATABASE_NAME,
TABLE_NAME)));
+ tablePath = new Path(FailingFileIO.getFailingPath(failingName,
tempDir.toString()));
fileIO = new FailingFileIO();
} else {
- tablePath =
- new Path(
- TraceableFileIO.SCHEME + "://",
- CatalogUtils.stringifyPath(
- tempDir.toString(), DATABASE_NAME,
TABLE_NAME));
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
fileIO = LocalFileIO.create();
}