This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f69c84468220e52557c0ac65706d1b79e946d3b
Author: Jane Chan <[email protected]>
AuthorDate: Thu Dec 30 15:29:20 2021 +0800

    [FLINK-25176][table] Fix typos and migrate SqlToOperationConverterTest to 
use assertJ
---
 .../table/operations/UseCatalogOperation.java      |   2 +-
 .../ddl/AlterTableDropConstraintOperation.java     |   4 +-
 .../operations/SqlToOperationConverter.java        |   3 +-
 .../plan/abilities/sink/PartitioningSpec.java      |   3 +-
 .../operations/SqlToOperationConverterTest.java    | 592 +++++++++++----------
 5 files changed, 315 insertions(+), 289 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
index a15d50f..c442fc5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java
@@ -33,6 +33,6 @@ public class UseCatalogOperation implements UseOperation {
 
     @Override
     public String asSummaryString() {
-        return String.format("USE CATALOGS %s", catalogName);
+        return String.format("USE CATALOG %s", catalogName);
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
index 10f8c1e..b3f7c7d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
@@ -36,6 +36,8 @@ public class AlterTableDropConstraintOperation extends 
AlterTableOperation {
 
     @Override
     public String asSummaryString() {
-        return String.format("ALTER TABLE %s DROP CONSTRAINT %s", 
tableIdentifier, constraintName);
+        return String.format(
+                "ALTER TABLE %s DROP CONSTRAINT %s",
+                tableIdentifier.asSummaryString(), constraintName);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index a2b9cfe..a946308 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -392,8 +392,7 @@ public class SqlToOperationConverter {
         if (!optionalCatalogTable.isPresent() || 
optionalCatalogTable.get().isTemporary()) {
             throw new ValidationException(
                     String.format(
-                            "Table %s doesn't exist or is a temporary table.",
-                            tableIdentifier.toString()));
+                            "Table %s doesn't exist or is a temporary table.", 
tableIdentifier));
         }
         CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
         if (baseTable instanceof CatalogView) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/PartitioningSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/PartitioningSpec.java
index 0ba8d0d..bf9b8d7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/PartitioningSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/PartitioningSpec.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.plan.abilities.sink;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
-import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -33,7 +32,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the partition
- * to/from JSON, but also can write partitioned data for {@link 
SupportsWritingMetadata}.
+ * to/from JSON, but also can write partitioned data for {@link 
SupportsPartitioning}.
  */
 @JsonTypeName("Partitioning")
 public class PartitioningSpec implements SinkAbilitySpec {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index bcf33a0..97d4ae3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -87,12 +87,10 @@ import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
 
 import org.apache.calcite.sql.SqlNode;
+import org.assertj.core.api.HamcrestCondition;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 
@@ -111,14 +109,8 @@ import static 
org.apache.flink.table.planner.utils.OperationMatchers.isCreateTab
 import static 
org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
 import static 
org.apache.flink.table.planner.utils.OperationMatchers.withOptions;
 import static 
org.apache.flink.table.planner.utils.OperationMatchers.withSchema;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test cases for {@link SqlToOperationConverter}. */
 public class SqlToOperationConverterTest {
@@ -157,8 +149,6 @@ public class SqlToOperationConverterTest {
         return plannerContext;
     }
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     @Before
     public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
         catalogManager.initSchemaResolver(
@@ -193,29 +183,31 @@ public class SqlToOperationConverterTest {
     public void testUseCatalog() {
         final String sql = "USE CATALOG cat1";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseCatalogOperation;
-        assertEquals("cat1", ((UseCatalogOperation) 
operation).getCatalogName());
+        assertThat(operation).isInstanceOf(UseCatalogOperation.class);
+        assertThat(((UseCatalogOperation) 
operation).getCatalogName()).isEqualTo("cat1");
+        assertThat(operation.asSummaryString()).isEqualTo("USE CATALOG cat1");
     }
 
     @Test
     public void testUseDatabase() {
         final String sql1 = "USE db1";
         Operation operation1 = parse(sql1, SqlDialect.DEFAULT);
-        assert operation1 instanceof UseDatabaseOperation;
-        assertEquals("builtin", ((UseDatabaseOperation) 
operation1).getCatalogName());
-        assertEquals("db1", ((UseDatabaseOperation) 
operation1).getDatabaseName());
+        assertThat(operation1).isInstanceOf(UseDatabaseOperation.class);
+        assertThat(((UseDatabaseOperation) 
operation1).getCatalogName()).isEqualTo("builtin");
+        assertThat(((UseDatabaseOperation) 
operation1).getDatabaseName()).isEqualTo("db1");
 
         final String sql2 = "USE cat1.db1";
         Operation operation2 = parse(sql2, SqlDialect.DEFAULT);
-        assert operation2 instanceof UseDatabaseOperation;
-        assertEquals("cat1", ((UseDatabaseOperation) 
operation2).getCatalogName());
-        assertEquals("db1", ((UseDatabaseOperation) 
operation2).getDatabaseName());
+        assertThat(operation2).isInstanceOf(UseDatabaseOperation.class);
+        assertThat(((UseDatabaseOperation) 
operation2).getCatalogName()).isEqualTo("cat1");
+        assertThat(((UseDatabaseOperation) 
operation2).getDatabaseName()).isEqualTo("db1");
     }
 
-    @Test(expected = ValidationException.class)
+    @Test
     public void testUseDatabaseWithException() {
         final String sql = "USE cat1.db1.tbl1";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
+        assertThatThrownBy(() -> parse(sql, SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class);
     }
 
     @Test
@@ -244,17 +236,17 @@ public class SqlToOperationConverterTest {
 
         for (int i = 0; i < createDatabaseSqls.length; i++) {
             Operation operation = parse(createDatabaseSqls[i], 
SqlDialect.DEFAULT);
-            assert operation instanceof CreateDatabaseOperation;
+            assertThat(operation).isInstanceOf(CreateDatabaseOperation.class);
             final CreateDatabaseOperation createDatabaseOperation =
                     (CreateDatabaseOperation) operation;
-            assertEquals(expectedCatalogs[i], 
createDatabaseOperation.getCatalogName());
-            assertEquals(expectedDatabase, 
createDatabaseOperation.getDatabaseName());
-            assertEquals(
-                    expectedComments[i], 
createDatabaseOperation.getCatalogDatabase().getComment());
-            assertEquals(expectedIgnoreIfExists[i], 
createDatabaseOperation.isIgnoreIfExists());
-            assertEquals(
-                    expectedProperties[i],
-                    
createDatabaseOperation.getCatalogDatabase().getProperties());
+            
assertThat(createDatabaseOperation.getCatalogName()).isEqualTo(expectedCatalogs[i]);
+            
assertThat(createDatabaseOperation.getDatabaseName()).isEqualTo(expectedDatabase);
+            
assertThat(createDatabaseOperation.getCatalogDatabase().getComment())
+                    .isEqualTo(expectedComments[i]);
+            assertThat(createDatabaseOperation.isIgnoreIfExists())
+                    .isEqualTo(expectedIgnoreIfExists[i]);
+            
assertThat(createDatabaseOperation.getCatalogDatabase().getProperties())
+                    .isEqualTo(expectedProperties[i]);
         }
     }
 
@@ -274,12 +266,12 @@ public class SqlToOperationConverterTest {
 
         for (int i = 0; i < dropDatabaseSqls.length; i++) {
             Operation operation = parse(dropDatabaseSqls[i], 
SqlDialect.DEFAULT);
-            assert operation instanceof DropDatabaseOperation;
+            assertThat(operation).isInstanceOf(DropDatabaseOperation.class);
             final DropDatabaseOperation dropDatabaseOperation = 
(DropDatabaseOperation) operation;
-            assertEquals(expectedCatalogs[i], 
dropDatabaseOperation.getCatalogName());
-            assertEquals(expectedDatabase, 
dropDatabaseOperation.getDatabaseName());
-            assertEquals(expectedIfExists[i], 
dropDatabaseOperation.isIfExists());
-            assertEquals(expectedIsCascades[i], 
dropDatabaseOperation.isCascade());
+            
assertThat(dropDatabaseOperation.getCatalogName()).isEqualTo(expectedCatalogs[i]);
+            
assertThat(dropDatabaseOperation.getDatabaseName()).isEqualTo(expectedDatabase);
+            
assertThat(dropDatabaseOperation.isIfExists()).isEqualTo(expectedIfExists[i]);
+            
assertThat(dropDatabaseOperation.isCascade()).isEqualTo(expectedIsCascades[i]);
         }
     }
 
@@ -293,18 +285,18 @@ public class SqlToOperationConverterTest {
                         "db1", new CatalogDatabaseImpl(new HashMap<>(), 
"db1_comment"), true);
         final String sql = "alter database cat1.db1 set ('k1'='v1', 
'K2'='V2')";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof AlterDatabaseOperation;
+        assertThat(operation).isInstanceOf(AlterDatabaseOperation.class);
         Map<String, String> properties = new HashMap<>();
         properties.put("k1", "v1");
         properties.put("K2", "V2");
-        assertEquals("db1", ((AlterDatabaseOperation) 
operation).getDatabaseName());
-        assertEquals("cat1", ((AlterDatabaseOperation) 
operation).getCatalogName());
-        assertEquals(
-                "db1_comment",
-                ((AlterDatabaseOperation) 
operation).getCatalogDatabase().getComment());
-        assertEquals(
-                properties,
-                ((AlterDatabaseOperation) 
operation).getCatalogDatabase().getProperties());
+
+        AlterDatabaseOperation alterDatabaseOperation = 
(AlterDatabaseOperation) operation;
+        assertThat(alterDatabaseOperation.getDatabaseName()).isEqualTo("db1");
+        assertThat(alterDatabaseOperation.getCatalogName()).isEqualTo("cat1");
+        assertThat(alterDatabaseOperation.getCatalogDatabase().getComment())
+                .isEqualTo("db1_comment");
+        assertThat(alterDatabaseOperation.getCatalogDatabase().getProperties())
+                .isEqualTo(properties);
     }
 
     @Test
@@ -316,11 +308,11 @@ public class SqlToOperationConverterTest {
         expectedOptions.put("k2", "v2");
 
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof LoadModuleOperation;
+        assertThat(operation).isInstanceOf(LoadModuleOperation.class);
         final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) 
operation;
 
-        assertEquals(expectedModuleName, loadModuleOperation.getModuleName());
-        assertEquals(expectedOptions, loadModuleOperation.getOptions());
+        
assertThat(loadModuleOperation.getModuleName()).isEqualTo(expectedModuleName);
+        
assertThat(loadModuleOperation.getOptions()).isEqualTo(expectedOptions);
     }
 
     @Test
@@ -329,10 +321,11 @@ public class SqlToOperationConverterTest {
         final String expectedModuleName = "dummy";
 
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UnloadModuleOperation;
+        assertThat(operation).isInstanceOf(UnloadModuleOperation.class);
+
         final UnloadModuleOperation unloadModuleOperation = 
(UnloadModuleOperation) operation;
 
-        assertEquals(expectedModuleName, 
unloadModuleOperation.getModuleName());
+        
assertThat(unloadModuleOperation.getModuleName()).isEqualTo(expectedModuleName);
     }
 
     @Test
@@ -341,11 +334,12 @@ public class SqlToOperationConverterTest {
         final List<String> expectedModuleNames = 
Collections.singletonList("dummy");
 
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseModulesOperation;
+        assertThat(operation).isInstanceOf(UseModulesOperation.class);
+
         final UseModulesOperation useModulesOperation = (UseModulesOperation) 
operation;
 
-        assertEquals(expectedModuleNames, 
useModulesOperation.getModuleNames());
-        assertEquals("USE MODULES: [dummy]", 
useModulesOperation.asSummaryString());
+        
assertThat(useModulesOperation.getModuleNames()).isEqualTo(expectedModuleNames);
+        assertThat(useModulesOperation.asSummaryString()).isEqualTo("USE 
MODULES: [dummy]");
     }
 
     @Test
@@ -354,33 +348,34 @@ public class SqlToOperationConverterTest {
         final List<String> expectedModuleNames = Arrays.asList("x", "y", "z");
 
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseModulesOperation;
+        assertThat(operation).isInstanceOf(UseModulesOperation.class);
+
         final UseModulesOperation useModulesOperation = (UseModulesOperation) 
operation;
 
-        assertEquals(expectedModuleNames, 
useModulesOperation.getModuleNames());
-        assertEquals("USE MODULES: [x, y, z]", 
useModulesOperation.asSummaryString());
+        
assertThat(useModulesOperation.getModuleNames()).isEqualTo(expectedModuleNames);
+        assertThat(useModulesOperation.asSummaryString()).isEqualTo("USE 
MODULES: [x, y, z]");
     }
 
     @Test
     public void testShowModules() {
         final String sql = "SHOW MODULES";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowModulesOperation;
+        assertThat(operation).isInstanceOf(ShowModulesOperation.class);
         final ShowModulesOperation showModulesOperation = 
(ShowModulesOperation) operation;
 
-        assertFalse(showModulesOperation.requireFull());
-        assertEquals("SHOW MODULES", showModulesOperation.asSummaryString());
+        assertThat(showModulesOperation.requireFull()).isFalse();
+        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW 
MODULES");
     }
 
     @Test
     public void testShowFullModules() {
         final String sql = "SHOW FULL MODULES";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowModulesOperation;
+        assertThat(operation).isInstanceOf(ShowModulesOperation.class);
         final ShowModulesOperation showModulesOperation = 
(ShowModulesOperation) operation;
 
-        assertTrue(showModulesOperation.requireFull());
-        assertEquals("SHOW FULL MODULES", 
showModulesOperation.asSummaryString());
+        assertThat(showModulesOperation.requireFull()).isTrue();
+        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW 
FULL MODULES");
     }
 
     @Test
@@ -409,20 +404,20 @@ public class SqlToOperationConverterTest {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CreateTableOperation;
+        assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
         CatalogTable catalogTable = op.getCatalogTable();
-        assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
-        assertArrayEquals(
-                catalogTable.getSchema().getFieldNames(), new String[] {"a", 
"b", "c", "d"});
-        assertArrayEquals(
-                catalogTable.getSchema().getFieldDataTypes(),
-                new DataType[] {
-                    DataTypes.BIGINT(),
-                    DataTypes.VARCHAR(Integer.MAX_VALUE),
-                    DataTypes.INT(),
-                    DataTypes.VARCHAR(Integer.MAX_VALUE)
-                });
+        
assertThat(catalogTable.getPartitionKeys()).hasSameElementsAs(Arrays.asList("a",
 "d"));
+        assertThat(catalogTable.getSchema().getFieldNames())
+                .isEqualTo(new String[] {"a", "b", "c", "d"});
+        assertThat(catalogTable.getSchema().getFieldDataTypes())
+                .isEqualTo(
+                        new DataType[] {
+                            DataTypes.BIGINT(),
+                            DataTypes.VARCHAR(Integer.MAX_VALUE),
+                            DataTypes.INT(),
+                            DataTypes.VARCHAR(Integer.MAX_VALUE)
+                        });
     }
 
     @Test
@@ -441,25 +436,25 @@ public class SqlToOperationConverterTest {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CreateTableOperation;
+        assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
         CatalogTable catalogTable = op.getCatalogTable();
         TableSchema tableSchema = catalogTable.getSchema();
         assertThat(
-                tableSchema
-                        .getPrimaryKey()
-                        .map(UniqueConstraint::asSummaryString)
-                        .orElse("fakeVal"),
-                is("CONSTRAINT ct1 PRIMARY KEY (a, b)"));
-        assertArrayEquals(new String[] {"a", "b", "c", "d"}, 
tableSchema.getFieldNames());
-        assertArrayEquals(
-                new DataType[] {
-                    DataTypes.BIGINT().notNull(),
-                    DataTypes.STRING().notNull(),
-                    DataTypes.INT(),
-                    DataTypes.STRING()
-                },
-                tableSchema.getFieldDataTypes());
+                        tableSchema
+                                .getPrimaryKey()
+                                .map(UniqueConstraint::asSummaryString)
+                                .orElse("fakeVal"))
+                .isEqualTo("CONSTRAINT ct1 PRIMARY KEY (a, b)");
+        assertThat(tableSchema.getFieldNames()).isEqualTo(new String[] {"a", 
"b", "c", "d"});
+        assertThat(tableSchema.getFieldDataTypes())
+                .isEqualTo(
+                        new DataType[] {
+                            DataTypes.BIGINT().notNull(),
+                            DataTypes.STRING().notNull(),
+                            DataTypes.INT(),
+                            DataTypes.STRING()
+                        });
     }
 
     @Test
@@ -479,13 +474,13 @@ public class SqlToOperationConverterTest {
                         + ")\n";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Flink doesn't support ENFORCED mode for PRIMARY KEY "
-                        + "constraint. ENFORCED/NOT ENFORCED  controls if the 
constraint "
-                        + "checks are performed on the incoming/outgoing data. 
"
-                        + "Flink does not own the data therefore the only 
supported mode is the NOT ENFORCED mode");
-        parse(sql, planner, parser);
+        assertThatThrownBy(() -> parse(sql, planner, parser))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Flink doesn't support ENFORCED mode for PRIMARY KEY "
+                                + "constraint. ENFORCED/NOT ENFORCED  controls 
if the constraint "
+                                + "checks are performed on the 
incoming/outgoing data. "
+                                + "Flink does not own the data therefore the 
only supported mode is the NOT ENFORCED mode");
     }
 
     @Test
@@ -503,18 +498,14 @@ public class SqlToOperationConverterTest {
                         + ")\n";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse(sql, planner, parser);
+        assertThatThrownBy(() -> parse(sql, planner, parser))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("UNIQUE constraint is not supported 
yet");
     }
 
     @Test
     public void testPrimaryKeyOnGeneratedColumn() {
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Could not create a PRIMARY KEY with column 'c' at line 5, 
column 34.\n"
-                        + "A PRIMARY KEY constraint must be declared on 
physical columns.");
-        final String sql2 =
+        final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a bigint not null,\n"
                         + "  b varchar not null,\n"
@@ -524,15 +515,17 @@ public class SqlToOperationConverterTest {
                         + "    'connector' = 'kafka',\n"
                         + "    'kafka.topic' = 'log.test'\n"
                         + ")\n";
-        parseAndConvert(sql2);
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Could not create a PRIMARY KEY with column 'c' at 
line 5, column 34.\n"
+                                + "A PRIMARY KEY constraint must be declared 
on physical columns.");
     }
 
     @Test
     public void testPrimaryKeyNonExistentColumn() {
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Primary key column 'd' is not defined in the schema at line 
5, column 34");
-        final String sql2 =
+        final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a bigint not null,\n"
                         + "  b varchar not null,\n"
@@ -542,7 +535,10 @@ public class SqlToOperationConverterTest {
                         + "    'connector' = 'kafka',\n"
                         + "    'kafka.topic' = 'log.test'\n"
                         + ")\n";
-        parseAndConvert(sql2);
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Primary key column 'd' is not defined in the schema 
at line 5, column 34");
     }
 
     @Test
@@ -560,9 +556,9 @@ public class SqlToOperationConverterTest {
         final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
+        assertThat(node).isInstanceOf(SqlCreateTable.class);
         Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
-        assert operation instanceof CreateTableOperation;
+        assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
         CatalogTable catalogTable = op.getCatalogTable();
         Map<String, String> options =
@@ -574,7 +570,7 @@ public class SqlToOperationConverterTest {
                         + "a.b-c-d.*=adad, "
                         + "a.b-c-d.e-f.g=ada, "
                         + "a.b-c-d.e-f1231.g=ada}";
-        assertEquals(expected, sortedProperties.toString());
+        assertThat(sortedProperties.toString()).isEqualTo(expected);
     }
 
     @Test
@@ -601,15 +597,6 @@ public class SqlToOperationConverterTest {
         checkExplainSql(sql);
     }
 
-    private void checkExplainSql(String sql) {
-        final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlRichExplain;
-        Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
-        assert operation instanceof ExplainOperation;
-    }
-
     @Test
     public void testCreateTableWithWatermark()
             throws FunctionAlreadyExistException, DatabaseNotExistException {
@@ -628,9 +615,10 @@ public class SqlToOperationConverterTest {
         final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
+        assertThat(node).isInstanceOf(SqlCreateTable.class);
+
         Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
-        assert operation instanceof CreateTableOperation;
+        assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
         CatalogTable catalogTable = op.getCatalogTable();
         Map<String, String> properties = catalogTable.toProperties();
@@ -647,7 +635,7 @@ public class SqlToOperationConverterTest {
                 "`builtin`.`default`.`myfunc`(`c`, 1) - INTERVAL '5' SECOND");
         expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
         expected.put("connector.type", "kafka");
-        assertEquals(expected, properties);
+        assertThat(properties).isEqualTo(expected);
     }
 
     @Test
@@ -679,18 +667,22 @@ public class SqlToOperationConverterTest {
                         + "like sourceTable";
         Operation operation = parseAndConvert(sql);
 
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", 
DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .column("a", DataTypes.INT())
-                                        .watermark("f1", "`f1` - INTERVAL '5' 
SECOND")
-                                        .build()),
-                        withOptions(entry("connector.type", "kafka"), 
entry("format.type", "json")),
-                        partitionedBy("a", "f0")));
+        assertThat(operation)
+                .is(
+                        new HamcrestCondition<>(
+                                isCreateTableOperation(
+                                        withSchema(
+                                                Schema.newBuilder()
+                                                        .column("f0", 
DataTypes.INT().notNull())
+                                                        .column("f1", 
DataTypes.TIMESTAMP(3))
+                                                        .column("a", 
DataTypes.INT())
+                                                        .watermark(
+                                                                "f1", "`f1` - 
INTERVAL '5' SECOND")
+                                                        .build()),
+                                        withOptions(
+                                                entry("connector.type", 
"kafka"),
+                                                entry("format.type", "json")),
+                                        partitionedBy("a", "f0"))));
     }
 
     @Test
@@ -712,16 +704,18 @@ public class SqlToOperationConverterTest {
         final String sql = "create table mytable like 
`builtin`.`default`.sourceTable";
         Operation operation = parseAndConvert(sql);
 
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", 
DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .build()),
-                        withOptions(
-                                entry("connector.type", "kafka"), 
entry("format.type", "json"))));
+        assertThat(operation)
+                .is(
+                        new HamcrestCondition<>(
+                                isCreateTableOperation(
+                                        withSchema(
+                                                Schema.newBuilder()
+                                                        .column("f0", 
DataTypes.INT().notNull())
+                                                        .column("f1", 
DataTypes.TIMESTAMP(3))
+                                                        .build()),
+                                        withOptions(
+                                                entry("connector.type", 
"kafka"),
+                                                entry("format.type", 
"json")))));
     }
 
     @Test
@@ -760,18 +754,22 @@ public class SqlToOperationConverterTest {
                         + ")";
         Operation operation = parseAndConvert(sql);
 
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", 
DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .column("a", DataTypes.INT())
-                                        .watermark("f1", "`f1` - INTERVAL '5' 
SECOND")
-                                        .build()),
-                        withOptions(entry("connector.type", "kafka"), 
entry("format.type", "json")),
-                        partitionedBy("a", "f0")));
+        assertThat(operation)
+                .is(
+                        new HamcrestCondition<>(
+                                isCreateTableOperation(
+                                        withSchema(
+                                                Schema.newBuilder()
+                                                        .column("f0", 
DataTypes.INT().notNull())
+                                                        .column("f1", 
DataTypes.TIMESTAMP(3))
+                                                        .column("a", 
DataTypes.INT())
+                                                        .watermark(
+                                                                "f1", "`f1` - 
INTERVAL '5' SECOND")
+                                                        .build()),
+                                        withOptions(
+                                                entry("connector.type", 
"kafka"),
+                                                entry("format.type", "json")),
+                                        partitionedBy("a", "f0"))));
     }
 
     @Test
@@ -779,10 +777,10 @@ public class SqlToOperationConverterTest {
         final String sql =
                 "create table derivedTable(\n" + "  a int\n" + ")\n" + 
"PARTITIONED BY (f3)";
 
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Partition column 'f3' not defined in the table schema. 
Available columns: ['a']");
-        parseAndConvert(sql);
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Partition column 'f3' not defined in the table 
schema. Available columns: ['a']");
     }
 
     @Test
@@ -803,10 +801,10 @@ public class SqlToOperationConverterTest {
                         + "PARTITIONED BY (f3)\n"
                         + "like sourceTable";
 
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Partition column 'f3' not defined in the table schema. 
Available columns: ['f0', 'a']");
-        parseAndConvert(sql);
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Partition column 'f3' not defined in the table 
schema. Available columns: ['f0', 'a']");
     }
 
     @Test
@@ -817,12 +815,12 @@ public class SqlToOperationConverterTest {
                         + "  watermark for f1 as `f1` - interval '5' second\n"
                         + ")";
 
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1' is not defined in the table 
schema,"
-                        + " at line 3, column 17\n"
-                        + "Available fields: ['a']");
-        parseAndConvert(sql);
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The rowtime attribute field 'f1' is not defined in 
the table schema,"
+                                + " at line 3, column 17\n"
+                                + "Available fields: ['a']");
     }
 
     @Test
@@ -843,12 +841,12 @@ public class SqlToOperationConverterTest {
                         + ")\n"
                         + "like sourceTable";
 
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1' is not defined in the table 
schema,"
-                        + " at line 3, column 17\n"
-                        + "Available fields: ['f0', 'a']");
-        parseAndConvert(sql);
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The rowtime attribute field 'f1' is not defined in 
the table schema,"
+                                + " at line 3, column 17\n"
+                                + "Available fields: ['f0', 'a']");
     }
 
     @Test
@@ -875,13 +873,13 @@ public class SqlToOperationConverterTest {
                         + ")\n"
                         + "like sourceTable";
 
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1.t' is not defined in the 
table schema,"
-                        + " at line 3, column 20\n"
-                        + "Nested field 't' was not found in a composite type:"
-                        + " ROW<`tmstmp` TIMESTAMP(3)>.");
-        parseAndConvert(sql);
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The rowtime attribute field 'f1.t' is not defined in 
the table schema,"
+                                + " at line 3, column 20\n"
+                                + "Nested field 't' was not found in a 
composite type:"
+                                + " ROW<`tmstmp` TIMESTAMP(3)>.");
     }
 
     @Test
@@ -890,11 +888,11 @@ public class SqlToOperationConverterTest {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CatalogSinkModifyOperation;
+        assertThat(operation).isInstanceOf(CatalogSinkModifyOperation.class);
         CatalogSinkModifyOperation sinkModifyOperation = 
(CatalogSinkModifyOperation) operation;
         final Map<String, String> expectedStaticPartitions = new HashMap<>();
         expectedStaticPartitions.put("a", "1");
-        assertEquals(expectedStaticPartitions, 
sinkModifyOperation.getStaticPartitions());
+        
assertThat(sinkModifyOperation.getStaticPartitions()).isEqualTo(expectedStaticPartitions);
     }
 
     @Test
@@ -905,12 +903,12 @@ public class SqlToOperationConverterTest {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CatalogSinkModifyOperation;
+        assertThat(operation).isInstanceOf(CatalogSinkModifyOperation.class);
         CatalogSinkModifyOperation sinkModifyOperation = 
(CatalogSinkModifyOperation) operation;
         Map<String, String> dynamicOptions = 
sinkModifyOperation.getDynamicOptions();
-        assertNotNull(dynamicOptions);
-        assertThat(dynamicOptions.size(), is(2));
-        assertThat(dynamicOptions.toString(), is("{k1=v1, k2=v2}"));
+        assertThat(dynamicOptions).isNotNull();
+        assertThat(dynamicOptions.size()).isEqualTo(2);
+        assertThat(dynamicOptions.toString()).isEqualTo("{k1=v1, k2=v2}");
     }
 
     @Test
@@ -918,9 +916,10 @@ public class SqlToOperationConverterTest {
         final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(AssertionError.class);
-        thrown.expectMessage("Hint [OPTIONS] only support " + "non empty key 
value options");
-        parse(sql, planner, parser);
+        assertThatThrownBy(() -> parse(sql, planner, parser))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining(
+                        "Hint [OPTIONS] only support " + "non empty key value 
options");
     }
 
     @Test // TODO: tweak the tests when FLINK-13604 is fixed.
@@ -1069,11 +1068,11 @@ public class SqlToOperationConverterTest {
         final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
+        assertThat(node).isInstanceOf(SqlCreateTable.class);
         Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
         TableSchema schema = ((CreateTableOperation) 
operation).getCatalogTable().getSchema();
         Object[] expectedDataTypes = testItems.stream().map(item -> 
item.expectedType).toArray();
-        assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
+        assertThat(schema.getFieldDataTypes()).isEqualTo(expectedDataTypes);
     }
 
     @Test
@@ -1100,23 +1099,22 @@ public class SqlToOperationConverterTest {
                 ObjectIdentifier.of("builtin", "default", "my_udf3"), 
Func8$.MODULE$);
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, 
getParserBySqlDialect(SqlDialect.DEFAULT));
-        assert operation instanceof CreateTableOperation;
+        assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
         CatalogTable catalogTable = op.getCatalogTable();
-        assertArrayEquals(
-                new String[] {"a", "b", "c", "d", "e", "f", "g"},
-                catalogTable.getSchema().getFieldNames());
-        assertArrayEquals(
-                new DataType[] {
-                    DataTypes.INT(),
-                    DataTypes.STRING(),
-                    DataTypes.INT(),
-                    DataTypes.STRING(),
-                    DataTypes.INT().notNull(),
-                    DataTypes.INT(),
-                    DataTypes.STRING()
-                },
-                catalogTable.getSchema().getFieldDataTypes());
+        assertThat(catalogTable.getSchema().getFieldNames())
+                .isEqualTo(new String[] {"a", "b", "c", "d", "e", "f", "g"});
+        assertThat(catalogTable.getSchema().getFieldDataTypes())
+                .isEqualTo(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT(),
+                            DataTypes.STRING()
+                        });
         String[] columnExpressions =
                 catalogTable.getSchema().getTableColumns().stream()
                         .filter(ComputedColumn.class::isInstance)
@@ -1131,7 +1129,7 @@ public class SqlToOperationConverterTest {
                     "`builtin`.`default`.`my_udf2`(`a`) + 1",
                     "`builtin`.`default`.`my_udf3`(`a`) || '##'"
                 };
-        assertArrayEquals(expected, columnExpressions);
+        assertThat(columnExpressions).isEqualTo(expected);
     }
 
     @Test
@@ -1151,7 +1149,7 @@ public class SqlToOperationConverterTest {
 
         final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final Operation operation = parse(sql, planner, 
getParserBySqlDialect(SqlDialect.DEFAULT));
-        assert operation instanceof CreateTableOperation;
+        assertThat(operation).isInstanceOf(CreateTableOperation.class);
         final CreateTableOperation op = (CreateTableOperation) operation;
         final TableSchema actualSchema = op.getCatalogTable().getSchema();
 
@@ -1164,7 +1162,7 @@ public class SqlToOperationConverterTest {
                         .add(TableColumn.metadata("e", DataTypes.INT(), true))
                         .build();
 
-        assertEquals(expectedSchema, actualSchema);
+        assertThat(actualSchema).isEqualTo(expectedSchema);
     }
 
     @Test
@@ -1181,11 +1179,13 @@ public class SqlToOperationConverterTest {
         // test rename table converter
         for (int i = 0; i < renameTableSqls.length; i++) {
             Operation operation = parse(renameTableSqls[i], 
SqlDialect.DEFAULT);
-            assert operation instanceof AlterTableRenameOperation;
+            
assertThat(operation).isInstanceOf(AlterTableRenameOperation.class);
             final AlterTableRenameOperation alterTableRenameOperation =
                     (AlterTableRenameOperation) operation;
-            assertEquals(expectedIdentifier, 
alterTableRenameOperation.getTableIdentifier());
-            assertEquals(expectedNewIdentifier, 
alterTableRenameOperation.getNewTableIdentifier());
+            assertThat(alterTableRenameOperation.getTableIdentifier())
+                    .isEqualTo(expectedIdentifier);
+            assertThat(alterTableRenameOperation.getNewTableIdentifier())
+                    .isEqualTo(expectedNewIdentifier);
         }
         // test alter table options
         Operation operation =
@@ -1203,9 +1203,9 @@ public class SqlToOperationConverterTest {
         operation = parse("alter table cat1.db1.tb1 reset ('k')", 
SqlDialect.DEFAULT);
         assertAlterTableOptions(operation, expectedIdentifier, 
Collections.emptyMap());
 
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("ALTER TABLE RESET does not support empty key");
-        parse("alter table cat1.db1.tb1 reset ()", SqlDialect.DEFAULT);
+        assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ()", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("ALTER TABLE RESET does not support 
empty key");
     }
 
     @Test
@@ -1216,49 +1216,65 @@ public class SqlToOperationConverterTest {
                 parse(
                         "alter table tb1 add constraint ct1 primary key(a, b) 
not enforced",
                         SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableAddConstraintOperation;
+        
assertThat(operation).isInstanceOf(AlterTableAddConstraintOperation.class);
         AlterTableAddConstraintOperation addConstraintOperation =
                 (AlterTableAddConstraintOperation) operation;
-        assertThat(
-                addConstraintOperation.asSummaryString(),
-                is(
+        assertThat(addConstraintOperation.asSummaryString())
+                .isEqualTo(
                         "ALTER TABLE ADD CONSTRAINT: (identifier: 
[`cat1`.`db1`.`tb1`], "
-                                + "constraintName: [ct1], columns: [a, b])"));
+                                + "constraintName: [ct1], columns: [a, b])");
         // Test alter table add pk on nullable column
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("Could not create a PRIMARY KEY 'ct1'. Column 'c' 
is nullable.");
-        parse("alter table tb1 add constraint ct1 primary key(c) not 
enforced", SqlDialect.DEFAULT);
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add constraint ct1 
primary key(c) not enforced",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Could not create a PRIMARY KEY 'ct1'. Column 'c' is 
nullable.");
     }
 
     @Test
     public void testAlterTableAddPkConstraintEnforced() throws Exception {
         prepareTable(false);
         // Test alter table add enforced
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Flink doesn't support ENFORCED mode for PRIMARY KEY 
constraint. "
-                        + "ENFORCED/NOT ENFORCED  controls if the constraint 
checks are performed on the "
-                        + "incoming/outgoing data. Flink does not own the data 
therefore the "
-                        + "only supported mode is the NOT ENFORCED mode");
-        parse("alter table tb1 add constraint ct1 primary key(a, b)", 
SqlDialect.DEFAULT);
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add constraint ct1 
primary key(a, b)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Flink doesn't support ENFORCED mode for PRIMARY KEY 
constraint. "
+                                + "ENFORCED/NOT ENFORCED  controls if the 
constraint checks are performed on the "
+                                + "incoming/outgoing data. Flink does not own 
the data therefore the "
+                                + "only supported mode is the NOT ENFORCED 
mode");
     }
 
     @Test
     public void testAlterTableAddUniqueConstraint() throws Exception {
         prepareTable(false);
         // Test alter add table constraint.
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse("alter table tb1 add constraint ct1 unique(a, b) not enforced", 
SqlDialect.DEFAULT);
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add constraint ct1 
unique(a, b) not enforced",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("UNIQUE constraint is not supported 
yet");
     }
 
     @Test
     public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
         prepareTable(false);
         // Test alter table add enforced
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse("alter table tb1 add constraint ct1 unique(a, b)", 
SqlDialect.DEFAULT);
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add constraint ct1 
unique(a, b)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("UNIQUE constraint is not supported 
yet");
     }
 
     @Test
@@ -1266,15 +1282,14 @@ public class SqlToOperationConverterTest {
         prepareTable(true);
         // Test alter table add enforced
         Operation operation = parse("alter table tb1 drop constraint ct1", 
SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableDropConstraintOperation;
+        
assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class);
         AlterTableDropConstraintOperation dropConstraint =
                 (AlterTableDropConstraintOperation) operation;
-        assertThat(
-                dropConstraint.asSummaryString(),
-                is("ALTER TABLE `cat1`.`db1`.`tb1` DROP CONSTRAINT ct1"));
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("CONSTRAINT [ct2] does not exist");
-        parse("alter table tb1 drop constraint ct2", SqlDialect.DEFAULT);
+        assertThat(dropConstraint.asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1 DROP CONSTRAINT ct1");
+        assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct2", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("CONSTRAINT [ct2] does not exist");
     }
 
     @Test
@@ -1316,7 +1331,7 @@ public class SqlToOperationConverterTest {
                         + ")";
 
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assertThat(operation, instanceOf(CreateViewOperation.class));
+        assertThat(operation).isInstanceOf(CreateViewOperation.class);
     }
 
     @Test
@@ -1344,29 +1359,29 @@ public class SqlToOperationConverterTest {
                         + "from sourceA /*+ OPTIONS('changelog-mode'='I') */";
 
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assertThat(operation, instanceOf(CreateViewOperation.class));
+        assertThat(operation).isInstanceOf(CreateViewOperation.class);
     }
 
     @Test
     public void testBeginStatementSet() {
         final String sql = "BEGIN STATEMENT SET";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof BeginStatementSetOperation;
+        assertThat(operation).isInstanceOf(BeginStatementSetOperation.class);
         final BeginStatementSetOperation beginStatementSetOperation =
                 (BeginStatementSetOperation) operation;
 
-        assertEquals("BEGIN STATEMENT SET", 
beginStatementSetOperation.asSummaryString());
+        
assertThat(beginStatementSetOperation.asSummaryString()).isEqualTo("BEGIN 
STATEMENT SET");
     }
 
     @Test
     public void testEnd() {
         final String sql = "END";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof EndStatementSetOperation;
+        assertThat(operation).isInstanceOf(EndStatementSetOperation.class);
         final EndStatementSetOperation endStatementSetOperation =
                 (EndStatementSetOperation) operation;
 
-        assertEquals("END", endStatementSetOperation.asSummaryString());
+        
assertThat(endStatementSetOperation.asSummaryString()).isEqualTo("END");
     }
 
     @Test
@@ -1375,7 +1390,7 @@ public class SqlToOperationConverterTest {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, parser);
-        assertTrue(operation instanceof ExplainOperation);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
     }
 
     @Test
@@ -1384,7 +1399,7 @@ public class SqlToOperationConverterTest {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         Operation operation = parse(sql, planner, parser);
-        assertTrue(operation instanceof ExplainOperation);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
     }
 
     @Test
@@ -1402,7 +1417,7 @@ public class SqlToOperationConverterTest {
                                     (AddJarOperation)
                                             parser.parse(String.format("ADD 
JAR '%s'", jarPath))
                                                     .get(0);
-                            Assert.assertEquals(jarPath, operation.getPath());
+                            assertThat(operation.getPath()).isEqualTo(jarPath);
                         });
     }
 
@@ -1421,7 +1436,7 @@ public class SqlToOperationConverterTest {
                                     (RemoveJarOperation)
                                             parser.parse(String.format("REMOVE 
JAR '%s'", jarPath))
                                                     .get(0);
-                            Assert.assertEquals(jarPath, operation.getPath());
+                            assertThat(operation.getPath()).isEqualTo(jarPath);
                         });
     }
 
@@ -1429,34 +1444,36 @@ public class SqlToOperationConverterTest {
     public void testShowJars() {
         final String sql = "SHOW JARS";
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowJarsOperation;
+        assertThat(operation).isInstanceOf(ShowJarsOperation.class);
         final ShowJarsOperation showModulesOperation = (ShowJarsOperation) 
operation;
-        assertEquals("SHOW JARS", showModulesOperation.asSummaryString());
+        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW 
JARS");
     }
 
     @Test
     public void testSet() {
         Operation operation1 = parse("SET", SqlDialect.DEFAULT);
-        assertTrue(operation1 instanceof SetOperation);
-        assertFalse(((SetOperation) operation1).getKey().isPresent());
-        assertFalse(((SetOperation) operation1).getValue().isPresent());
+        assertThat(operation1).isInstanceOf(SetOperation.class);
+        SetOperation setOperation1 = (SetOperation) operation1;
+        assertThat(setOperation1.getKey()).isNotPresent();
+        assertThat(setOperation1.getValue()).isNotPresent();
 
         Operation operation2 = parse("SET 'test-key' = 'test-value'", 
SqlDialect.DEFAULT);
-        assertTrue(operation2 instanceof SetOperation);
-        assertEquals("test-key", ((SetOperation) operation2).getKey().get());
-        assertEquals("test-value", ((SetOperation) 
operation2).getValue().get());
+        assertThat(operation2).isInstanceOf(SetOperation.class);
+        SetOperation setOperation2 = (SetOperation) operation2;
+        assertThat(setOperation2.getKey()).hasValue("test-key");
+        assertThat(setOperation2.getValue()).hasValue("test-value");
     }
 
     @Test
     public void testReset() {
         Operation operation1 = parse("RESET", SqlDialect.DEFAULT);
-        assertTrue(operation1 instanceof ResetOperation);
-        assertFalse(((ResetOperation) operation1).getKey().isPresent());
+        assertThat(operation1).isInstanceOf(ResetOperation.class);
+        assertThat(((ResetOperation) operation1).getKey()).isNotPresent();
 
         Operation operation2 = parse("RESET 'test-key'", SqlDialect.DEFAULT);
-        assertTrue(operation2 instanceof ResetOperation);
-        assertTrue(((ResetOperation) operation2).getKey().isPresent());
-        assertEquals("test-key", ((ResetOperation) operation2).getKey().get());
+        assertThat(operation2).isInstanceOf(ResetOperation.class);
+        assertThat(((ResetOperation) operation2).getKey()).isPresent();
+        assertThat(((ResetOperation) 
operation2).getKey()).hasValue("test-key");
     }
 
     // ~ Tool Methods 
----------------------------------------------------------
@@ -1473,28 +1490,36 @@ public class SqlToOperationConverterTest {
         return testItem;
     }
 
+    private void checkExplainSql(String sql) {
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        SqlNode node = parser.parse(sql);
+        assertThat(node).isInstanceOf(SqlRichExplain.class);
+        Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
     private void assertShowFunctions(
             String sql, String expectedSummary, FunctionScope expectedScope) {
         Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowFunctionsOperation;
+        assertThat(operation).isInstanceOf(ShowFunctionsOperation.class);
+
         final ShowFunctionsOperation showFunctionsOperation = 
(ShowFunctionsOperation) operation;
 
-        assertEquals(expectedScope, showFunctionsOperation.getFunctionScope());
-        assertEquals(expectedSummary, 
showFunctionsOperation.asSummaryString());
+        
assertThat(showFunctionsOperation.getFunctionScope()).isEqualTo(expectedScope);
+        
assertThat(showFunctionsOperation.asSummaryString()).isEqualTo(expectedSummary);
     }
 
     private void assertAlterTableOptions(
             Operation operation,
             ObjectIdentifier expectedIdentifier,
             Map<String, String> expectedOptions) {
-        assert operation instanceof AlterTableOptionsOperation;
+        assertThat(operation).isInstanceOf(AlterTableOptionsOperation.class);
         final AlterTableOptionsOperation alterTableOptionsOperation =
                 (AlterTableOptionsOperation) operation;
-        assertEquals(expectedIdentifier, 
alterTableOptionsOperation.getTableIdentifier());
-        assertEquals(
-                expectedOptions.size(),
-                
alterTableOptionsOperation.getCatalogTable().getOptions().size());
-        assertEquals(expectedOptions, 
alterTableOptionsOperation.getCatalogTable().getOptions());
+        
assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
+        assertThat(alterTableOptionsOperation.getCatalogTable().getOptions())
+                .isEqualTo(expectedOptions);
     }
 
     private Operation parse(String sql, FlinkPlannerImpl planner, 
CalciteParser parser) {
@@ -1528,7 +1553,8 @@ public class SqlToOperationConverterTest {
                         Collections.singletonMap("k", "v"));
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
+        ObjectPath tablePath = new ObjectPath("db1", "tb1");
+        catalog.createTable(tablePath, catalogTable, true);
     }
 
     private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {

Reply via email to