This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 938d3323 [FLINK-30706] Remove flink-table-common dependency for table 
store core
938d3323 is described below

commit 938d3323e39048b22b693716533910e66510ac9b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 17 17:39:50 2023 +0800

    [FLINK-30706] Remove flink-table-common dependency for table store core
    
    This closes #485
---
 flink-table-store-codegen/pom.xml                  |   2 +-
 .../apache/flink/table/store/codegen/SortSpec.java |   4 +-
 .../CodeGeneratorContext.scala                     |   2 +-
 .../SortCodeGenerator.scala                        |   2 +-
 flink-table-store-common/pom.xml                   |   7 -
 .../flink/table/store/format/FileFormat.java       |   3 +-
 .../apache/flink/table/store/types/BinaryType.java |   4 +-
 .../apache/flink/table/store/types/CharType.java   |   4 +-
 .../apache/flink/table/store/types/DataField.java  |   4 +-
 .../table/store/types/DataTypeJsonParser.java      |   8 +-
 .../flink/table/store/types/DecimalType.java       |   6 +-
 .../table/store/types/LocalZonedTimestampType.java |   4 +-
 .../apache/flink/table/store/types/RowType.java    |   5 +-
 .../apache/flink/table/store/types/TimeType.java   |   4 +-
 .../flink/table/store/types/TimestampType.java     |   4 +-
 .../flink/table/store/types/VarBinaryType.java     |   4 +-
 .../flink/table/store/types/VarCharType.java       |   4 +-
 .../flink/table/store/utils/EncodingUtils.java     |  39 ++++
 .../flink/table/store/utils/InstantiationUtil.java | 199 +++++++++++++++++++++
 .../apache/flink/table/store/utils/TypeUtils.java  |   5 +-
 .../table/store/datagen/DataGenVisitorBase.java    |   3 +-
 .../store/datagen/RandomGeneratorVisitor.java      |   3 +-
 .../flink/table/store/types/DataTypesTest.java     |   6 +-
 .../flink/table/store/connector/FlinkCatalog.java  |  27 +--
 .../table/store/connector/sink/TableStoreSink.java |   8 +-
 .../store/connector/AppendOnlyTableITCase.java     |   3 +-
 .../table/store/connector/FlinkCatalogTest.java    |   5 +-
 .../store/connector/ReadWriteTableITCase.java      |   3 +-
 .../table/store/connector/RescaleBucketITCase.java |   3 +-
 flink-table-store-core/pom.xml                     |   7 -
 .../org/apache/flink/table/store/CoreOptions.java  |   3 +-
 .../table/store/file/catalog/AbstractCatalog.java  |  25 ++-
 .../flink/table/store/file/catalog/Catalog.java    |  69 ++++---
 .../store/file/catalog/FileSystemCatalog.java      |  39 ++--
 .../flink/table/store/file/catalog/Identifier.java |  88 +++++++++
 .../compact/aggregate/FieldAggregator.java         |   3 +-
 .../file/operation/AbstractFileStoreScan.java      |   3 +-
 .../flink/table/store/file/operation/Lock.java     |  14 +-
 .../store/file/utils/FileStorePathFactory.java     |   1 -
 .../table/store/file/utils/PartitionPathUtils.java | 123 +++++++++++++
 .../ContinuousDataFileSnapshotEnumerator.java      |   3 +-
 .../store/file/schema/DataTypeJsonParserTest.java  |   3 +-
 .../table/store/file/schema/SchemaManagerTest.java |   3 +-
 flink-table-store-format/pom.xml                   |  11 +-
 .../store/format/avro/AvroSchemaConverter.java     |   7 +-
 .../apache/flink/table/store/hive/HiveCatalog.java | 106 +++++------
 .../flink/table/store/spark/SparkCatalog.java      |  18 +-
 47 files changed, 649 insertions(+), 252 deletions(-)

diff --git a/flink-table-store-codegen/pom.xml 
b/flink-table-store-codegen/pom.xml
index 3862c261..aaf0539d 100644
--- a/flink-table-store-codegen/pom.xml
+++ b/flink-table-store-codegen/pom.xml
@@ -40,7 +40,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
+            <artifactId>flink-core</artifactId>
             <version>${flink.version}</version>
         </dependency>
 
diff --git 
a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
 
b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
index 75e23270..fb2dbbb2 100644
--- 
a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
+++ 
b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
@@ -23,8 +23,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /** {@link SortSpec} describes how the data will be sorted. */
 public class SortSpec {
 
@@ -34,7 +32,7 @@ public class SortSpec {
     private final SortFieldSpec[] fieldSpecs;
 
     public SortSpec(SortFieldSpec[] fieldSpecs) {
-        this.fieldSpecs = checkNotNull(fieldSpecs);
+        this.fieldSpecs = fieldSpecs;
     }
 
     /** Gets all {@link SortFieldSpec} in the SortSpec. */
diff --git 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
index e3d3bcc1..98b9ec27 100644
--- 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
+++ 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.codegen
 import org.apache.flink.table.store.codegen.GenerateUtils.{newName, newNames}
 import org.apache.flink.table.store.data.InternalSerializers
 import org.apache.flink.table.store.types.DataType
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.table.store.utils.InstantiationUtil
 
 import scala.collection.mutable
 
diff --git 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
index b5e322e5..144bb14a 100644
--- 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
+++ 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
@@ -42,7 +42,7 @@ class SortCodeGenerator(val input: RowType, val sortSpec: 
SortSpec) {
   /** Chunks for long, int, short, byte */
   private val POSSIBLE_CHUNK_SIZES = Array(8, 4, 2, 1)
 
-  /** For get${operator} set${operator} of 
[[org.apache.flink.core.memory.MemorySegment]] */
+  /** For get${operator} set${operator} of MemorySegment. */
   private val BYTE_OPERATOR_MAPPING = Map(8 -> "Long", 4 -> "Int", 2 -> 
"Short", 1 -> "")
 
   /** For primitive define */
diff --git a/flink-table-store-common/pom.xml b/flink-table-store-common/pom.xml
index c27e01a9..a2c9a87a 100644
--- a/flink-table-store-common/pom.xml
+++ b/flink-table-store-common/pom.xml
@@ -32,13 +32,6 @@ under the License.
     <name>Flink Table Store : Common</name>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-code-splitter</artifactId>
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
index 97f73486..e9baeda8 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.types.RowType;
@@ -98,7 +97,7 @@ public abstract class FileFormat {
                         fromIdentifier(identifier, options, 
FileFormat.class.getClassLoader())
                                 .orElseThrow(
                                         () ->
-                                                new ValidationException(
+                                                new RuntimeException(
                                                         String.format(
                                                                 "Could not 
find any factories that implement '%s' in the classpath.",
                                                                 
FileFormatFactory.class
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
index 656bc29f..1a7a1e9c 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /** Data type of a fixed-length binary string (=a sequence of bytes). */
@@ -40,7 +38,7 @@ public class BinaryType extends DataType {
     public BinaryType(boolean isNullable, int length) {
         super(isNullable, DataTypeRoot.BINARY);
         if (length < MIN_LENGTH) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Binary string length must be between %d and %d 
(both inclusive).",
                             MIN_LENGTH, MAX_LENGTH));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
index a6b0e17b..95ce0331 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /**
@@ -44,7 +42,7 @@ public class CharType extends DataType {
     public CharType(boolean isNullable, int length) {
         super(isNullable, DataTypeRoot.CHAR);
         if (length < MIN_LENGTH) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Character string length must be between %d and %d 
(both inclusive).",
                             MIN_LENGTH, MAX_LENGTH));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
index 499b18a0..ee88b44c 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Objects;
 
-import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
-import static org.apache.flink.table.utils.EncodingUtils.escapeSingleQuotes;
+import static 
org.apache.flink.table.store.utils.EncodingUtils.escapeIdentifier;
+import static 
org.apache.flink.table.store.utils.EncodingUtils.escapeSingleQuotes;
 
 /** Defines the field of a row type. */
 public final class DataField implements Serializable {
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
index e2e273e7..60c172b7 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
 import javax.annotation.Nullable;
@@ -360,15 +358,15 @@ public final class DataTypeJsonParser {
             return tokens.get(lastValidToken).cursorPosition + 1;
         }
 
-        private ValidationException parsingError(String cause, @Nullable 
Throwable t) {
-            return new ValidationException(
+        private IllegalArgumentException parsingError(String cause, @Nullable 
Throwable t) {
+            return new IllegalArgumentException(
                     String.format(
                             "Could not parse type at position %d: %s\n Input 
type string: %s",
                             lastCursor(), cause, inputString),
                     t);
         }
 
-        private ValidationException parsingError(String cause) {
+        private IllegalArgumentException parsingError(String cause) {
             return parsingError(cause, null);
         }
 
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
index d5c71104..bff82492 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /** Data type of a decimal number with fixed precision and scale. */
@@ -46,13 +44,13 @@ public class DecimalType extends DataType {
     public DecimalType(boolean isNullable, int precision, int scale) {
         super(isNullable, DataTypeRoot.DECIMAL);
         if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Decimal precision must be between %d and %d (both 
inclusive).",
                             MIN_PRECISION, MAX_PRECISION));
         }
         if (scale < MIN_SCALE || scale > precision) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Decimal scale must be between %d and the 
precision %d (both inclusive).",
                             MIN_SCALE, precision));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
index 41726a21..dcad9b39 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /**
@@ -53,7 +51,7 @@ public final class LocalZonedTimestampType extends DataType {
     public LocalZonedTimestampType(boolean isNullable, int precision) {
         super(isNullable, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
         if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Timestamp with local time zone precision must be 
between %d and %d (both inclusive).",
                             MIN_PRECISION, MAX_PRECISION));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
index 34ea1067..ee60253f 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -140,7 +139,7 @@ public final class RowType extends DataType {
         final List<String> fieldNames =
                 
fields.stream().map(DataField::name).collect(Collectors.toList());
         if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) 
{
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     "Field names must contain at least one non-whitespace 
character.");
         }
         final Set<String> duplicates =
@@ -148,7 +147,7 @@ public final class RowType extends DataType {
                         .filter(n -> Collections.frequency(fieldNames, n) > 1)
                         .collect(Collectors.toSet());
         if (!duplicates.isEmpty()) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format("Field names must be unique. Found 
duplicates: %s", duplicates));
         }
     }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
index 91186d3e..239e0b5c 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /**
@@ -49,7 +47,7 @@ public final class TimeType extends DataType {
     public TimeType(boolean isNullable, int precision) {
         super(isNullable, DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
         if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Time precision must be between %d and %d (both 
inclusive).",
                             MIN_PRECISION, MAX_PRECISION));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
index 19a26169..84c8d4b0 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /**
@@ -48,7 +46,7 @@ public class TimestampType extends DataType {
     public TimestampType(boolean isNullable, int precision) {
         super(isNullable, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
         if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Timestamp precision must be between %d and %d 
(both inclusive).",
                             MIN_PRECISION, MAX_PRECISION));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
index 2d40e648..1a576eeb 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /** Data type of a variable-length binary string (=a sequence of bytes). */
@@ -42,7 +40,7 @@ public final class VarBinaryType extends DataType {
     public VarBinaryType(boolean isNullable, int length) {
         super(isNullable, DataTypeRoot.VARBINARY);
         if (length < MIN_LENGTH) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Variable binary string length must be between %d 
and %d (both inclusive).",
                             MIN_LENGTH, MAX_LENGTH));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
index c6f0f383..0b6c74ca 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import java.util.Objects;
 
 /**
@@ -48,7 +46,7 @@ public final class VarCharType extends DataType {
     public VarCharType(boolean isNullable, int length) {
         super(isNullable, DataTypeRoot.VARCHAR);
         if (length < MIN_LENGTH) {
-            throw new ValidationException(
+            throw new IllegalArgumentException(
                     String.format(
                             "Variable character string length must be between 
%d and %d (both inclusive).",
                             MIN_LENGTH, MAX_LENGTH));
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/EncodingUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/EncodingUtils.java
new file mode 100644
index 00000000..2f8b3394
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/EncodingUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+/** General utilities for string-encoding. */
+public abstract class EncodingUtils {
+
+    private EncodingUtils() {
+        // do not instantiate
+    }
+
+    public static String escapeBackticks(String s) {
+        return s.replace("`", "``");
+    }
+
+    public static String escapeSingleQuotes(String s) {
+        return s.replace("'", "''");
+    }
+
+    public static String escapeIdentifier(String s) {
+        return "`" + escapeBackticks(s) + "`";
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/InstantiationUtil.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/InstantiationUtil.java
new file mode 100644
index 00000000..6b72750a
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/InstantiationUtil.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+
+/** Utility class to create instances from class objects and checking failure 
reasons. */
+public final class InstantiationUtil {
+
+    public static <T> T deserializeObject(byte[] bytes, ClassLoader cl)
+            throws IOException, ClassNotFoundException {
+
+        return deserializeObject(new ByteArrayInputStream(bytes), cl);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T deserializeObject(InputStream in, ClassLoader cl)
+            throws IOException, ClassNotFoundException {
+
+        final ClassLoader old = Thread.currentThread().getContextClassLoader();
+        // not using resource try to avoid AutoClosable's close() on the given 
stream
+        try {
+            ObjectInputStream oois = new 
InstantiationUtil.ClassLoaderObjectInputStream(in, cl);
+            Thread.currentThread().setContextClassLoader(cl);
+            return (T) oois.readObject();
+        } finally {
+            Thread.currentThread().setContextClassLoader(old);
+        }
+    }
+
+    public static byte[] serializeObject(Object o) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+            oos.writeObject(o);
+            oos.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    /**
+     * Clones the given serializable object using Java serialization.
+     *
+     * @param obj Object to clone
+     * @param <T> Type of the object to clone
+     * @return The cloned object
+     * @throws IOException Thrown if the serialization or deserialization 
process fails.
+     * @throws ClassNotFoundException Thrown if any of the classes referenced 
by the object cannot
+     *     be resolved during deserialization.
+     */
+    public static <T extends Serializable> T clone(T obj)
+            throws IOException, ClassNotFoundException {
+        if (obj == null) {
+            return null;
+        } else {
+            return clone(obj, obj.getClass().getClassLoader());
+        }
+    }
+
+    /**
+     * Clones the given serializable object using Java serialization, using 
the given classloader to
+     * resolve the cloned classes.
+     *
+     * @param obj Object to clone
+     * @param classLoader The classloader to resolve the classes during 
deserialization.
+     * @param <T> Type of the object to clone
+     * @return Cloned object
+     * @throws IOException Thrown if the serialization or deserialization 
process fails.
+     * @throws ClassNotFoundException Thrown if any of the classes referenced 
by the object cannot
+     *     be resolved during deserialization.
+     */
+    public static <T extends Serializable> T clone(T obj, ClassLoader 
classLoader)
+            throws IOException, ClassNotFoundException {
+        if (obj == null) {
+            return null;
+        } else {
+            final byte[] serializedObject = serializeObject(obj);
+            return deserializeObject(serializedObject, classLoader);
+        }
+    }
+
+    /** A custom ObjectInputStream that can load classes using a specific 
ClassLoader. */
+    public static class ClassLoaderObjectInputStream extends ObjectInputStream 
{
+
+        protected final ClassLoader classLoader;
+
+        public ClassLoaderObjectInputStream(InputStream in, ClassLoader 
classLoader)
+                throws IOException {
+            super(in);
+            this.classLoader = classLoader;
+        }
+
+        @Override
+        protected Class<?> resolveClass(ObjectStreamClass desc)
+                throws IOException, ClassNotFoundException {
+            if (classLoader != null) {
+                String name = desc.getName();
+                try {
+                    return Class.forName(name, false, classLoader);
+                } catch (ClassNotFoundException ex) {
+                    // check if class is a primitive class
+                    Class<?> cl = primitiveClasses.get(name);
+                    if (cl != null) {
+                        // return primitive class
+                        return cl;
+                    } else {
+                        // throw ClassNotFoundException
+                        throw ex;
+                    }
+                }
+            }
+
+            return super.resolveClass(desc);
+        }
+
+        @Override
+        protected Class<?> resolveProxyClass(String[] interfaces)
+                throws IOException, ClassNotFoundException {
+            if (classLoader != null) {
+                ClassLoader nonPublicLoader = null;
+                boolean hasNonPublicInterface = false;
+
+                // define proxy in class loader of non-public interface(s), if 
any
+                Class<?>[] classObjs = new Class<?>[interfaces.length];
+                for (int i = 0; i < interfaces.length; i++) {
+                    Class<?> cl = Class.forName(interfaces[i], false, 
classLoader);
+                    if ((cl.getModifiers() & Modifier.PUBLIC) == 0) {
+                        if (hasNonPublicInterface) {
+                            if (nonPublicLoader != cl.getClassLoader()) {
+                                throw new IllegalAccessError(
+                                        "conflicting non-public interface 
class loaders");
+                            }
+                        } else {
+                            nonPublicLoader = cl.getClassLoader();
+                            hasNonPublicInterface = true;
+                        }
+                    }
+                    classObjs[i] = cl;
+                }
+                try {
+                    return Proxy.getProxyClass(
+                            hasNonPublicInterface ? nonPublicLoader : 
classLoader, classObjs);
+                } catch (IllegalArgumentException e) {
+                    throw new ClassNotFoundException(null, e);
+                }
+            }
+
+            return super.resolveProxyClass(interfaces);
+        }
+
+        // ------------------------------------------------
+
+        private static final HashMap<String, Class<?>> primitiveClasses = new 
HashMap<>(9);
+
+        static {
+            primitiveClasses.put("boolean", boolean.class);
+            primitiveClasses.put("byte", byte.class);
+            primitiveClasses.put("char", char.class);
+            primitiveClasses.put("short", short.class);
+            primitiveClasses.put("int", int.class);
+            primitiveClasses.put("long", long.class);
+            primitiveClasses.put("float", float.class);
+            primitiveClasses.put("double", double.class);
+            primitiveClasses.put("void", void.class);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Private constructor to prevent instantiation. */
+    private InstantiationUtil() {
+        throw new RuntimeException();
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
index 8c0c4f0c..c500b206 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.utils;
 
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.store.data.BinaryString;
 import org.apache.flink.table.store.data.Decimal;
 import org.apache.flink.table.store.data.Timestamp;
@@ -112,7 +111,7 @@ public class TypeUtils {
     }
 
     /** Parse a {@link BinaryString} to boolean. */
-    public static boolean toBoolean(BinaryString str) throws TableException {
+    public static boolean toBoolean(BinaryString str) {
         BinaryString lowerCase = str.toLowerCase();
         if (TRUE_STRINGS.contains(lowerCase)) {
             return true;
@@ -120,7 +119,7 @@ public class TypeUtils {
         if (FALSE_STRINGS.contains(lowerCase)) {
             return false;
         }
-        throw new TableException("Cannot parse '" + str + "' as BOOLEAN.");
+        throw new RuntimeException("Cannot parse '" + str + "' as BOOLEAN.");
     }
 
     public static int toDate(BinaryString input) throws DateTimeException {
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
index 07021635..a97be0e4 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.datagen;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.types.DataType;
 import org.apache.flink.table.store.types.DataTypeDefaultVisitor;
 import org.apache.flink.table.store.types.DateType;
@@ -59,7 +58,7 @@ public abstract class DataGenVisitorBase extends 
DataTypeDefaultVisitor<DataGene
 
     @Override
     protected DataGeneratorContainer defaultMethod(DataType dataType) {
-        throw new ValidationException("Unsupported type: " + dataType);
+        throw new RuntimeException("Unsupported type: " + dataType);
     }
 
     private interface SerializableSupplier<T> extends Supplier<T>, 
Serializable {}
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
index fcc755bc..5d91e039 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.data.BinaryString;
 import org.apache.flink.table.store.data.Decimal;
 import org.apache.flink.table.store.data.GenericArray;
@@ -318,7 +317,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     @Override
     protected DataGeneratorContainer defaultMethod(DataType dataType) {
-        throw new ValidationException("Unsupported type: " + dataType);
+        throw new RuntimeException("Unsupported type: " + dataType);
     }
 
     private static RandomGenerator<BinaryString> getRandomStringGenerator(int 
length) {
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
index 08f553ee..b986d9b7 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.types;
 
-import org.apache.flink.table.api.ValidationException;
-
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.ObjectAssert;
 import org.assertj.core.api.ThrowingConsumer;
@@ -198,14 +196,14 @@ public class DataTypesTest {
                                                 new DataField(1, "b", new 
VarCharType()),
                                                 new DataField(2, "a", new 
VarCharType()),
                                                 new DataField(3, "a", new 
TimestampType()))))
-                .isInstanceOf(ValidationException.class);
+                .isInstanceOf(IllegalArgumentException.class);
 
         assertThatThrownBy(
                         () ->
                                 new RowType(
                                         Collections.singletonList(
                                                 new DataField(0, "", new 
VarCharType()))))
-                .isInstanceOf(ValidationException.class);
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index bf915466..0293d744 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.Identifier;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -159,9 +160,9 @@ public class FlinkCatalog extends AbstractCatalog {
             throws TableNotExistException, CatalogException {
         Table table;
         try {
-            table = catalog.getTable(tablePath);
+            table = catalog.getTable(toIdentifier(tablePath));
         } catch (Catalog.TableNotExistException e) {
-            throw new TableNotExistException(getName(), e.tablePath());
+            throw new TableNotExistException(getName(), tablePath);
         }
 
         if (table instanceof FileStoreTable) {
@@ -170,7 +171,7 @@ public class FlinkCatalog extends AbstractCatalog {
             // add path to source and sink
             catalogTable
                     .getOptions()
-                    .put(PATH.key(), 
catalog.getTableLocation(tablePath).toString());
+                    .put(PATH.key(), 
catalog.getTableLocation(toIdentifier(tablePath)).toString());
             return catalogTable;
         } else {
             return new SystemCatalogTable(table);
@@ -179,16 +180,16 @@ public class FlinkCatalog extends AbstractCatalog {
 
     @Override
     public boolean tableExists(ObjectPath tablePath) throws CatalogException {
-        return catalog.tableExists(tablePath);
+        return catalog.tableExists(toIdentifier(tablePath));
     }
 
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
         try {
-            catalog.dropTable(tablePath, ignoreIfNotExists);
+            catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
         } catch (Catalog.TableNotExistException e) {
-            throw new TableNotExistException(getName(), e.tablePath());
+            throw new TableNotExistException(getName(), tablePath);
         }
     }
 
@@ -217,9 +218,11 @@ public class FlinkCatalog extends AbstractCatalog {
 
         try {
             catalog.createTable(
-                    tablePath, FlinkCatalog.fromCatalogTable(catalogTable), 
ignoreIfExists);
+                    toIdentifier(tablePath),
+                    FlinkCatalog.fromCatalogTable(catalogTable),
+                    ignoreIfExists);
         } catch (Catalog.TableAlreadyExistException e) {
-            throw new TableAlreadyExistException(getName(), e.tablePath());
+            throw new TableAlreadyExistException(getName(), tablePath);
         } catch (Catalog.DatabaseNotExistException e) {
             throw new DatabaseNotExistException(getName(), e.database());
         }
@@ -265,9 +268,9 @@ public class FlinkCatalog extends AbstractCatalog {
                         });
 
         try {
-            catalog.alterTable(tablePath, changes, ignoreIfNotExists);
+            catalog.alterTable(toIdentifier(tablePath), changes, 
ignoreIfNotExists);
         } catch (Catalog.TableNotExistException e) {
-            throw new TableNotExistException(getName(), e.tablePath());
+            throw new TableNotExistException(getName(), tablePath);
         }
     }
 
@@ -372,6 +375,10 @@ public class FlinkCatalog extends AbstractCatalog {
                 catalogTable.getComment());
     }
 
+    public static Identifier toIdentifier(ObjectPath path) {
+        return new Identifier(path.getDatabaseName(), path.getObjectName());
+    }
+
     // --------------------- unsupported methods ----------------------------
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 27b4aaf3..4abf0ee6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.connector.FlinkCatalog;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
@@ -49,7 +50,7 @@ import java.util.Map;
 import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 
-/** Table sink to create {@link StoreSink}. */
+/** Table sink to create sink. */
 public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, 
SupportsPartitioning {
 
     private final ObjectIdentifier tableIdentifier;
@@ -129,7 +130,10 @@ public class TableStoreSink implements DynamicTableSink, 
SupportsOverwrite, Supp
                                                 
dataStream.getExecutionEnvironment(),
                                                 
dataStream.getTransformation()))
                                 .withLockFactory(
-                                        Lock.factory(lockFactory, 
tableIdentifier.toObjectPath()))
+                                        Lock.factory(
+                                                lockFactory,
+                                                FlinkCatalog.toIdentifier(
+                                                        
tableIdentifier.toObjectPath())))
                                 .withLogSinkFunction(logSinkFunction)
                                 .withOverwritePartition(overwrite ? 
staticPartitions : null)
                                 
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 354c7a5f..757934eb 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.types.Row;
@@ -43,7 +42,7 @@ public class AppendOnlyTableITCase extends CatalogITCaseBase {
                                 batchSql(
                                         "CREATE TABLE pk_table (id INT PRIMARY 
KEY NOT ENFORCED, data STRING) "
                                                 + "WITH 
('write-mode'='append-only')"))
-                .hasRootCauseInstanceOf(TableException.class)
+                .hasRootCauseInstanceOf(RuntimeException.class)
                 .hasRootCauseMessage(
                         "Cannot define any primary key in an append-only 
table. Set 'write-mode'='change-log' if still "
                                 + "want to keep the primary key definition.");
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
index 93271875..9aedf0d8 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -411,7 +411,10 @@ public class FlinkCatalogTest {
     }
 
     private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable 
t2) {
-        Path tablePath = ((FlinkCatalog) 
catalog).catalog().getTableLocation(path);
+        Path tablePath =
+                ((FlinkCatalog) catalog)
+                        .catalog()
+                        .getTableLocation(FlinkCatalog.toIdentifier(path));
         Map<String, String> options = new HashMap<>(t1.getOptions());
         options.put("path", tablePath.toString());
         t1 = ((ResolvedCatalogTable) t1).copy(options);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index cad59467..ffc19655 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -1107,7 +1106,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 .containsExactlyInAnyOrder(changelogRow("+I", "US Dollar", 
102L, "2022-06-20"));
         assertThatThrownBy(() -> insertInto(table, "('US Dollar', 102, 
'2022-06-20')"))
                 .rootCause()
-                .isInstanceOf(TableException.class)
+                .isInstanceOf(RuntimeException.class)
                 .hasMessage(
                         String.format(
                                 "Try to write partition {dt=2022-06-20} with a 
new bucket num %d, but the previous bucket num is 2. "
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index 328045c4..0ecc32ed 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -194,7 +193,7 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
         // check write without rescale
         assertThatThrownBy(() -> batchSql("INSERT INTO %s VALUES (6)", 
tableName))
                 .getRootCause()
-                .isInstanceOf(TableException.class)
+                .isInstanceOf(RuntimeException.class)
                 .hasMessage(
                         "Try to write table with a new bucket num 4, but the 
previous bucket num is 2. "
                                 + "Please switch to batch mode, and perform 
INSERT OVERWRITE to rescale current data layout first.");
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index 478dc980..3b54e349 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -53,13 +53,6 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index aedfddd1..6eba7099 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -29,7 +29,6 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.format.FileFormat;
@@ -847,7 +846,7 @@ public class CoreOptions implements Serializable {
 
         // Cannot define any primary key in an append-only table.
         if (!schema.primaryKeys().isEmpty() && Objects.equals(APPEND_ONLY, 
options.writeMode())) {
-            throw new TableException(
+            throw new RuntimeException(
                     "Cannot define any primary key in an append-only table. 
Set 'write-mode'='change-log' if "
                             + "still want to keep the primary key 
definition.");
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
index 4b7a7b70..a356ee76 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.catalog;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.Table;
@@ -33,19 +32,19 @@ public abstract class AbstractCatalog implements Catalog {
     protected static final String DB_SUFFIX = ".db";
 
     @Override
-    public Path getTableLocation(ObjectPath tablePath) {
-        if (tablePath.getObjectName().contains(SYSTEM_TABLE_SPLITTER)) {
+    public Path getTableLocation(Identifier identifier) {
+        if (identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)) {
             throw new IllegalArgumentException(
                     String.format(
                             "Table name[%s] cannot contain '%s' separator",
-                            tablePath.getObjectName(), SYSTEM_TABLE_SPLITTER));
+                            identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER));
         }
-        return new Path(databasePath(tablePath.getDatabaseName()), 
tablePath.getObjectName());
+        return new Path(databasePath(identifier.getDatabaseName()), 
identifier.getObjectName());
     }
 
     @Override
-    public Table getTable(ObjectPath tablePath) throws TableNotExistException {
-        String inputTableName = tablePath.getObjectName();
+    public Table getTable(Identifier identifier) throws TableNotExistException 
{
+        String inputTableName = identifier.getObjectName();
         if (inputTableName.contains(SYSTEM_TABLE_SPLITTER)) {
             String[] splits = StringUtils.split(inputTableName, 
SYSTEM_TABLE_SPLITTER);
             if (splits.length != 2) {
@@ -55,15 +54,15 @@ public abstract class AbstractCatalog implements Catalog {
             }
             String table = splits[0];
             String type = splits[1];
-            ObjectPath originTablePath = new 
ObjectPath(tablePath.getDatabaseName(), table);
-            if (!tableExists(originTablePath)) {
-                throw new TableNotExistException(tablePath);
+            Identifier originidentifier = new 
Identifier(identifier.getDatabaseName(), table);
+            if (!tableExists(originidentifier)) {
+                throw new TableNotExistException(identifier);
             }
-            Path location = getTableLocation(originTablePath);
+            Path location = getTableLocation(originidentifier);
             return SystemTableLoader.load(type, location);
         } else {
-            TableSchema tableSchema = getTableSchema(tablePath);
-            return FileStoreTableFactory.create(getTableLocation(tablePath), 
tableSchema);
+            TableSchema tableSchema = getTableSchema(identifier);
+            return FileStoreTableFactory.create(getTableLocation(identifier), 
tableSchema);
         }
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 919f6067..5cb76ade 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.catalog;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -94,72 +93,72 @@ public interface Catalog extends AutoCloseable {
     List<String> listTables(String databaseName) throws 
DatabaseNotExistException;
 
     /**
-     * Return the table location path identified by the given {@link 
ObjectPath}.
+     * Return the table location path identified by the given {@link 
Identifier}.
      *
-     * @param tablePath Path of the table
+     * @param identifier Path of the table
      * @return The requested table location
      */
-    Path getTableLocation(ObjectPath tablePath);
+    Path getTableLocation(Identifier identifier);
 
     /**
-     * Return a {@link TableSchema} identified by the given {@link ObjectPath}.
+     * Return a {@link TableSchema} identified by the given {@link Identifier}.
      *
-     * @param tablePath Path of the table
+     * @param identifier Path of the table
      * @return The requested table schema
      * @throws TableNotExistException if the target does not exist
      */
-    TableSchema getTableSchema(ObjectPath tablePath) throws 
TableNotExistException;
+    TableSchema getTableSchema(Identifier identifier) throws 
TableNotExistException;
 
     /**
-     * Return a {@link Table} identified by the given {@link ObjectPath}.
+     * Return a {@link Table} identified by the given {@link Identifier}.
      *
-     * @param tablePath Path of the table
+     * @param identifier Path of the table
      * @return The requested table
      * @throws TableNotExistException if the target does not exist
      */
-    Table getTable(ObjectPath tablePath) throws TableNotExistException;
+    Table getTable(Identifier identifier) throws TableNotExistException;
 
     /**
      * Check if a table exists in this catalog.
      *
-     * @param tablePath Path of the table
+     * @param identifier Path of the table
      * @return true if the given table exists in the catalog false otherwise
      */
-    boolean tableExists(ObjectPath tablePath);
+    boolean tableExists(Identifier identifier);
 
     /**
      * Drop a table.
      *
-     * @param tablePath Path of the table to be dropped
+     * @param identifier Path of the table to be dropped
      * @param ignoreIfNotExists Flag to specify behavior when the table does 
not exist: if set to
      *     false, throw an exception, if set to true, do nothing.
      * @throws TableNotExistException if the table does not exist
      */
-    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws 
TableNotExistException;
+    void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws 
TableNotExistException;
 
     /**
      * Create a new table.
      *
-     * @param tablePath path of the table to be created
+     * @param identifier path of the table to be created
      * @param tableSchema the table definition
      * @param ignoreIfExists flag to specify behavior when a table already 
exists at the given path:
      *     if set to false, it throws a TableAlreadyExistException, if set to 
true, do nothing.
      * @throws TableAlreadyExistException if table already exists and 
ignoreIfExists is false
-     * @throws DatabaseNotExistException if the database in tablePath doesn't 
exist
+     * @throws DatabaseNotExistException if the database in identifier doesn't 
exist
      */
-    void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean 
ignoreIfExists)
+    void createTable(Identifier identifier, UpdateSchema tableSchema, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException;
 
     /**
      * Modify an existing table from {@link SchemaChange}s.
      *
-     * @param tablePath path of the table to be modified
+     * @param identifier path of the table to be modified
      * @param changes the schema changes
      * @param ignoreIfNotExists flag to specify behavior when the table does 
not exist: if set to
      *     false, throw an exception, if set to true, do nothing.
      * @throws TableNotExistException if the table does not exist
      */
-    void alterTable(ObjectPath tablePath, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
+    void alterTable(Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException;
 
     /** Exception for trying to drop on a database that is not empty. */
@@ -227,19 +226,19 @@ public interface Catalog extends AutoCloseable {
 
         private static final String MSG = "Table %s already exists.";
 
-        private final ObjectPath tablePath;
+        private final Identifier identifier;
 
-        public TableAlreadyExistException(ObjectPath tablePath) {
-            this(tablePath, null);
+        public TableAlreadyExistException(Identifier identifier) {
+            this(identifier, null);
         }
 
-        public TableAlreadyExistException(ObjectPath tablePath, Throwable 
cause) {
-            super(String.format(MSG, tablePath.getFullName()), cause);
-            this.tablePath = tablePath;
+        public TableAlreadyExistException(Identifier identifier, Throwable 
cause) {
+            super(String.format(MSG, identifier.getFullName()), cause);
+            this.identifier = identifier;
         }
 
-        public ObjectPath tablePath() {
-            return tablePath;
+        public Identifier identifier() {
+            return identifier;
         }
     }
 
@@ -248,19 +247,19 @@ public interface Catalog extends AutoCloseable {
 
         private static final String MSG = "Table %s does not exist.";
 
-        private final ObjectPath tablePath;
+        private final Identifier identifier;
 
-        public TableNotExistException(ObjectPath tablePath) {
-            this(tablePath, null);
+        public TableNotExistException(Identifier identifier) {
+            this(identifier, null);
         }
 
-        public TableNotExistException(ObjectPath tablePath, Throwable cause) {
-            super(String.format(MSG, tablePath.getFullName()), cause);
-            this.tablePath = tablePath;
+        public TableNotExistException(Identifier identifier, Throwable cause) {
+            super(String.format(MSG, identifier.getFullName()), cause);
+            this.identifier = identifier;
         }
 
-        public ObjectPath tablePath() {
-            return tablePath;
+        public Identifier identifier() {
+            return identifier;
         }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 6dae077e..d9c54cb1 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.catalog;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -113,51 +112,51 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getTableSchema(ObjectPath tablePath) throws 
TableNotExistException {
-        Path path = getTableLocation(tablePath);
+    public TableSchema getTableSchema(Identifier identifier) throws 
TableNotExistException {
+        Path path = getTableLocation(identifier);
         return new SchemaManager(path)
                 .latest()
-                .orElseThrow(() -> new TableNotExistException(tablePath));
+                .orElseThrow(() -> new TableNotExistException(identifier));
     }
 
     @Override
-    public boolean tableExists(ObjectPath tablePath) {
-        return tableExists(getTableLocation(tablePath));
+    public boolean tableExists(Identifier identifier) {
+        return tableExists(getTableLocation(identifier));
     }
 
-    private boolean tableExists(Path tablePath) {
-        return new SchemaManager(tablePath).listAllIds().size() > 0;
+    private boolean tableExists(Path identifier) {
+        return new SchemaManager(identifier).listAllIds().size() > 0;
     }
 
     @Override
-    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+    public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
-        Path path = getTableLocation(tablePath);
+        Path path = getTableLocation(identifier);
         if (!tableExists(path)) {
             if (ignoreIfNotExists) {
                 return;
             }
 
-            throw new TableNotExistException(tablePath);
+            throw new TableNotExistException(identifier);
         }
 
         uncheck(() -> fs.delete(path, true));
     }
 
     @Override
-    public void createTable(ObjectPath tablePath, UpdateSchema table, boolean 
ignoreIfExists)
+    public void createTable(Identifier identifier, UpdateSchema table, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException {
-        if (!databaseExists(tablePath.getDatabaseName())) {
-            throw new DatabaseNotExistException(tablePath.getDatabaseName());
+        if (!databaseExists(identifier.getDatabaseName())) {
+            throw new DatabaseNotExistException(identifier.getDatabaseName());
         }
 
-        Path path = getTableLocation(tablePath);
+        Path path = getTableLocation(identifier);
         if (tableExists(path)) {
             if (ignoreIfExists) {
                 return;
             }
 
-            throw new TableAlreadyExistException(tablePath);
+            throw new TableAlreadyExistException(identifier);
         }
 
         uncheck(() -> new SchemaManager(path).commitNewVersion(table));
@@ -165,12 +164,12 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public void alterTable(
-            ObjectPath tablePath, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
+            Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException {
-        if (!tableExists(tablePath)) {
-            throw new TableNotExistException(tablePath);
+        if (!tableExists(identifier)) {
+            throw new TableNotExistException(identifier);
         }
-        uncheck(() -> new 
SchemaManager(getTableLocation(tablePath)).commitChanges(changes));
+        uncheck(() -> new 
SchemaManager(getTableLocation(identifier)).commitChanges(changes));
     }
 
     private static <T> T uncheck(Callable<T> callable) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
new file mode 100644
index 00000000..0d3735ef
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Identifies an object in a catalog. */
+public class Identifier implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String database;
+    private final String table;
+
+    public Identifier(String database, String table) {
+        this.database = database;
+        this.table = table;
+    }
+
+    public String getDatabaseName() {
+        return database;
+    }
+
+    public String getObjectName() {
+        return table;
+    }
+
+    public String getFullName() {
+        return String.format("%s.%s", database, table);
+    }
+
+    public static Identifier fromString(String fullName) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(fullName), "fullName 
cannot be null or empty");
+
+        String[] paths = fullName.split("\\.");
+
+        if (paths.length != 2) {
+            throw new IllegalArgumentException(
+                    String.format("Cannot get split '%s' to get database and 
table", fullName));
+        }
+
+        return new Identifier(paths[0], paths[1]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Identifier that = (Identifier) o;
+        return Objects.equals(database, that.database) && 
Objects.equals(table, that.table);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(database, table);
+    }
+
+    @Override
+    public String toString() {
+        return "Identifier{" + "database='" + database + '\'' + ", table='" + 
table + '\'' + '}';
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
index 2a7e52e4..7625761c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.mergetree.compact.aggregate;
 
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.types.DataType;
 
 import java.io.Serializable;
@@ -64,7 +63,7 @@ public abstract class FieldAggregator implements Serializable 
{
                     fieldAggregator = new FieldBoolAndAgg(fieldType);
                     break;
                 default:
-                    throw new ValidationException(
+                    throw new RuntimeException(
                             "Use unsupported aggregation or spell aggregate 
function incorrectly!");
             }
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 7dfd3dfc..79bbace2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -219,7 +218,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                                                                 
.defaultValue())
                                                 
.generatePartValues(file.partition())
                                 : "table";
-                throw new TableException(
+                throw new RuntimeException(
                         String.format(
                                 "Try to write %s with a new bucket num %d, but 
the previous bucket num is %d. "
                                         + "Please switch to batch mode, and 
perform INSERT OVERWRITE to rescale current data layout first.",
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
index ca3ab6cc..a4ee9c73 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.catalog.Identifier;
 
 import javax.annotation.Nullable;
 
@@ -37,7 +37,7 @@ public interface Lock extends AutoCloseable {
         Lock create();
     }
 
-    static Factory factory(@Nullable CatalogLock.Factory lockFactory, 
ObjectPath tablePath) {
+    static Factory factory(@Nullable CatalogLock.Factory lockFactory, 
Identifier tablePath) {
         return lockFactory == null
                 ? new EmptyFactory()
                 : new CatalogLockFactory(lockFactory, tablePath);
@@ -53,9 +53,9 @@ public interface Lock extends AutoCloseable {
         private static final long serialVersionUID = 1L;
 
         private final CatalogLock.Factory lockFactory;
-        private final ObjectPath tablePath;
+        private final Identifier tablePath;
 
-        public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath 
tablePath) {
+        public CatalogLockFactory(CatalogLock.Factory lockFactory, Identifier 
tablePath) {
             this.lockFactory = lockFactory;
             this.tablePath = tablePath;
         }
@@ -88,7 +88,7 @@ public interface Lock extends AutoCloseable {
         public void close() {}
     }
 
-    static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
+    static Lock fromCatalog(CatalogLock lock, Identifier tablePath) {
         if (lock == null) {
             return new EmptyLock();
         }
@@ -99,9 +99,9 @@ public interface Lock extends AutoCloseable {
     class CatalogLockImpl implements Lock {
 
         private final CatalogLock catalogLock;
-        private final ObjectPath tablePath;
+        private final Identifier tablePath;
 
-        private CatalogLockImpl(CatalogLock catalogLock, ObjectPath tablePath) 
{
+        private CatalogLockImpl(CatalogLock catalogLock, Identifier tablePath) 
{
             this.catalogLock = catalogLock;
             this.tablePath = tablePath;
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 8db38280..b7e81bb4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.concurrent.ThreadSafe;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionPathUtils.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionPathUtils.java
new file mode 100644
index 00000000..49f7e333
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionPathUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.Path;
+
+import java.util.BitSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Utils for file system. */
+public class PartitionPathUtils {
+
+    private static final BitSet CHAR_TO_ESCAPE = new BitSet(128);
+
+    static {
+        for (char c = 0; c < ' '; c++) {
+            CHAR_TO_ESCAPE.set(c);
+        }
+
+        /*
+         * ASCII 01-1F are HTTP control characters that need to be escaped.
+         * \u000A and \u000D are \n and \r, respectively.
+         */
+        char[] clist =
+                new char[] {
+                    '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', 
'\u0006', '\u0007', '\u0008',
+                    '\u0009', '\n', '\u000B', '\u000C', '\r', '\u000E', 
'\u000F', '\u0010',
+                    '\u0011', '\u0012', '\u0013', '\u0014', '\u0015', 
'\u0016', '\u0017', '\u0018',
+                    '\u0019', '\u001A', '\u001B', '\u001C', '\u001D', 
'\u001E', '\u001F', '"', '#',
+                    '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{', 
'}', '[', ']', '^'
+                };
+
+        for (char c : clist) {
+            CHAR_TO_ESCAPE.set(c);
+        }
+    }
+
+    private static boolean needsEscaping(char c) {
+        return c < CHAR_TO_ESCAPE.size() && CHAR_TO_ESCAPE.get(c);
+    }
+
+    /**
+     * Make partition path from partition spec.
+     *
+     * @param partitionSpec The partition spec.
+     * @return An escaped, valid partition name.
+     */
+    public static String generatePartitionPath(LinkedHashMap<String, String> 
partitionSpec) {
+        if (partitionSpec.isEmpty()) {
+            return "";
+        }
+        StringBuilder suffixBuf = new StringBuilder();
+        int i = 0;
+        for (Map.Entry<String, String> e : partitionSpec.entrySet()) {
+            if (i > 0) {
+                suffixBuf.append(Path.SEPARATOR);
+            }
+            suffixBuf.append(escapePathName(e.getKey()));
+            suffixBuf.append('=');
+            suffixBuf.append(escapePathName(e.getValue()));
+            i++;
+        }
+        suffixBuf.append(Path.SEPARATOR);
+        return suffixBuf.toString();
+    }
+
+    /**
+     * Escapes a path name.
+     *
+     * @param path The path to escape.
+     * @return An escaped path name.
+     */
+    static String escapePathName(String path) {
+        if (path == null || path.length() == 0) {
+            throw new RuntimeException("Path should not be null or empty: " + 
path);
+        }
+
+        StringBuilder sb = null;
+        for (int i = 0; i < path.length(); i++) {
+            char c = path.charAt(i);
+            if (needsEscaping(c)) {
+                if (sb == null) {
+                    sb = new StringBuilder(path.length() + 2);
+                    for (int j = 0; j < i; j++) {
+                        sb.append(path.charAt(j));
+                    }
+                }
+                escapeChar(c, sb);
+            } else if (sb != null) {
+                sb.append(c);
+            }
+        }
+        if (sb == null) {
+            return path;
+        }
+        return sb.toString();
+    }
+
+    private static void escapeChar(char c, StringBuilder sb) {
+        sb.append('%');
+        if (c < 16) {
+            sb.append('0');
+        }
+        sb.append(Integer.toHexString(c).toUpperCase());
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index bee9b94f..feef902d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table.source.snapshot;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -191,7 +190,7 @@ public class ContinuousDataFileSnapshotEnumerator 
implements SnapshotEnumerator
         if (schema.primaryKeys().size() > 0
                 && mergeEngineDesc.containsKey(mergeEngine)
                 && options.changelogProducer() != FULL_COMPACTION) {
-            throw new ValidationException(
+            throw new RuntimeException(
                     mergeEngineDesc.get(mergeEngine)
                             + " continuous reading is not supported. "
                             + "You can use full compaction changelog producer 
to support streaming reading.");
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
index cbaad530..0d6dbfec 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.schema;
 
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.types.ArrayType;
 import org.apache.flink.table.store.types.BigIntType;
@@ -195,7 +194,7 @@ public class DataTypeJsonParserTest {
     void testErrorMessage(TestSpec testSpec) {
         if (testSpec.expectedErrorMessage != null) {
             assertThatThrownBy(() -> parse(testSpec.jsonString))
-                    .isInstanceOf(ValidationException.class)
+                    .isInstanceOf(IllegalArgumentException.class)
                     .hasMessageContaining(testSpec.expectedErrorMessage);
         }
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index c11d091e..6830c5f8 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.schema;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -293,7 +292,7 @@ public class SchemaManagerTest {
                         () ->
                                 retryArtificialException(
                                         () -> 
manager.commitNewVersion(updateSchema)))
-                .isInstanceOf(TableException.class)
+                .isInstanceOf(RuntimeException.class)
                 .hasMessage(
                         "Cannot define any primary key in an append-only 
table. "
                                 + "Set 'write-mode'='change-log' if still want 
to keep the primary key definition.");
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index 3ace2d3e..abab49c9 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -44,18 +44,9 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
-        <!-- flink dependencies -->
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>${flink.table.runtime}</artifactId>
+            <artifactId>flink-core</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
index 641c495d..c8f36860 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.format.avro;
 
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.store.types.ArrayType;
 import org.apache.flink.table.store.types.DataType;
 import org.apache.flink.table.store.types.DataTypeRoot;
@@ -36,11 +35,7 @@ import org.apache.avro.SchemaBuilder;
 
 import java.util.List;
 
-/**
- * Converts an Avro schema into Flink's type information. It uses {@link 
RowTypeInfo} for
- * representing objects and converts Avro types into types that are compatible 
with Flink's Table &
- * SQL API.
- */
+/** Converts an Avro schema into Flink's type information. */
 public class AvroSchemaConverter {
 
     private AvroSchemaConverter() {
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index f513ee62..418a2f38 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.store.hive;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.AbstractCatalog;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.catalog.Identifier;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -157,7 +157,7 @@ public class HiveCatalog extends AbstractCatalog {
                     .filter(
                             tableName ->
                                     tableStoreTableExists(
-                                            new ObjectPath(databaseName, 
tableName), false))
+                                            new Identifier(databaseName, 
tableName), false))
                     .collect(Collectors.toList());
         } catch (UnknownDBException e) {
             throw new DatabaseNotExistException(databaseName, e);
@@ -167,11 +167,11 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getTableSchema(ObjectPath tablePath) throws 
TableNotExistException {
-        if (!tableStoreTableExists(tablePath)) {
-            throw new TableNotExistException(tablePath);
+    public TableSchema getTableSchema(Identifier identifier) throws 
TableNotExistException {
+        if (!tableStoreTableExists(identifier)) {
+            throw new TableNotExistException(identifier);
         }
-        Path tableLocation = getTableLocation(tablePath);
+        Path tableLocation = getTableLocation(identifier);
         return new SchemaManager(tableLocation)
                 .latest()
                 .orElseThrow(
@@ -179,41 +179,42 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public boolean tableExists(ObjectPath tablePath) {
-        return tableStoreTableExists(tablePath);
+    public boolean tableExists(Identifier identifier) {
+        return tableStoreTableExists(identifier);
     }
 
     @Override
-    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+    public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
-        if (!tableStoreTableExists(tablePath)) {
+        if (!tableStoreTableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
             } else {
-                throw new TableNotExistException(tablePath);
+                throw new TableNotExistException(identifier);
             }
         }
 
         try {
             client.dropTable(
-                    tablePath.getDatabaseName(), tablePath.getObjectName(), 
true, false, true);
+                    identifier.getDatabaseName(), identifier.getObjectName(), 
true, false, true);
         } catch (TException e) {
-            throw new RuntimeException("Failed to drop table " + 
tablePath.getFullName(), e);
+            throw new RuntimeException("Failed to drop table " + 
identifier.getFullName(), e);
         }
     }
 
     @Override
-    public void createTable(ObjectPath tablePath, UpdateSchema updateSchema, 
boolean ignoreIfExists)
+    public void createTable(
+            Identifier identifier, UpdateSchema updateSchema, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException {
-        String databaseName = tablePath.getDatabaseName();
+        String databaseName = identifier.getDatabaseName();
         if (!databaseExists(databaseName)) {
             throw new DatabaseNotExistException(databaseName);
         }
-        if (tableExists(tablePath)) {
+        if (tableExists(identifier)) {
             if (ignoreIfExists) {
                 return;
             } else {
-                throw new TableAlreadyExistException(tablePath);
+                throw new TableAlreadyExistException(identifier);
             }
         }
 
@@ -222,43 +223,43 @@ public class HiveCatalog extends AbstractCatalog {
         // if changes on Hive fails there is no harm to perform the same 
changes to files again
         TableSchema schema;
         try {
-            schema = schemaManager(tablePath).commitNewVersion(updateSchema);
+            schema = schemaManager(identifier).commitNewVersion(updateSchema);
         } catch (Exception e) {
             throw new RuntimeException(
                     "Failed to commit changes of table "
-                            + tablePath.getFullName()
+                            + identifier.getFullName()
                             + " to underlying files",
                     e);
         }
-        Table table = newHmsTable(tablePath);
-        updateHmsTable(table, tablePath, schema);
+        Table table = newHmsTable(identifier);
+        updateHmsTable(table, identifier, schema);
         try {
             client.createTable(table);
         } catch (TException e) {
-            throw new RuntimeException("Failed to create table " + 
tablePath.getFullName(), e);
+            throw new RuntimeException("Failed to create table " + 
identifier.getFullName(), e);
         }
     }
 
     @Override
     public void alterTable(
-            ObjectPath tablePath, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
+            Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException {
-        if (!tableStoreTableExists(tablePath)) {
+        if (!tableStoreTableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
             } else {
-                throw new TableNotExistException(tablePath);
+                throw new TableNotExistException(identifier);
             }
         }
 
         try {
             // first commit changes to underlying files
-            TableSchema schema = 
schemaManager(tablePath).commitChanges(changes);
+            TableSchema schema = 
schemaManager(identifier).commitChanges(changes);
 
             // sync to hive hms
-            Table table = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
-            updateHmsTable(table, tablePath, schema);
-            client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), table);
+            Table table = client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
+            updateHmsTable(table, identifier, schema);
+            client.alter_table(identifier.getDatabaseName(), 
identifier.getObjectName(), table);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -274,16 +275,16 @@ public class HiveCatalog extends AbstractCatalog {
         return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
     }
 
-    private void checkObjectPathUpperCase(ObjectPath objectPath) {
+    private void checkIdentifierUpperCase(Identifier identifier) {
         checkState(
-                
objectPath.getDatabaseName().equals(objectPath.getDatabaseName().toLowerCase()),
+                
identifier.getDatabaseName().equals(identifier.getDatabaseName().toLowerCase()),
                 String.format(
                         "Database name[%s] cannot contain upper case",
-                        objectPath.getDatabaseName()));
+                        identifier.getDatabaseName()));
         checkState(
-                
objectPath.getObjectName().equals(objectPath.getObjectName().toLowerCase()),
+                
identifier.getObjectName().equals(identifier.getObjectName().toLowerCase()),
                 String.format(
-                        "Table name[%s] cannot contain upper case", 
objectPath.getObjectName()));
+                        "Table name[%s] cannot contain upper case", 
identifier.getObjectName()));
     }
 
     private void checkFieldNamesUpperCase(List<String> fieldNames) {
@@ -303,13 +304,13 @@ public class HiveCatalog extends AbstractCatalog {
         return database;
     }
 
-    private Table newHmsTable(ObjectPath tablePath) {
+    private Table newHmsTable(Identifier identifier) {
         long currentTimeMillis = System.currentTimeMillis();
         final TableType tableType = hiveConf.getEnum(TABLE_TYPE.key(), 
TableType.MANAGED);
         Table table =
                 new Table(
-                        tablePath.getObjectName(),
-                        tablePath.getDatabaseName(),
+                        identifier.getObjectName(),
+                        identifier.getDatabaseName(),
                         // current linux user
                         System.getProperty("user.name"),
                         (int) (currentTimeMillis / 1000),
@@ -329,19 +330,20 @@ public class HiveCatalog extends AbstractCatalog {
         return table;
     }
 
-    private void updateHmsTable(Table table, ObjectPath tablePath, TableSchema 
schema) {
-        StorageDescriptor sd = convertToStorageDescriptor(tablePath, schema);
+    private void updateHmsTable(Table table, Identifier identifier, 
TableSchema schema) {
+        StorageDescriptor sd = convertToStorageDescriptor(identifier, schema);
         table.setSd(sd);
     }
 
-    private StorageDescriptor convertToStorageDescriptor(ObjectPath tablePath, 
TableSchema schema) {
+    private StorageDescriptor convertToStorageDescriptor(
+            Identifier identifier, TableSchema schema) {
         StorageDescriptor sd = new StorageDescriptor();
 
         sd.setCols(
                 schema.fields().stream()
                         .map(this::convertToFieldSchema)
                         .collect(Collectors.toList()));
-        sd.setLocation(getTableLocation(tablePath).toString());
+        sd.setLocation(getTableLocation(identifier).toString());
 
         sd.setInputFormat(INPUT_FORMAT_CLASS_NAME);
         sd.setOutputFormat(OUTPUT_FORMAT_CLASS_NAME);
@@ -361,20 +363,20 @@ public class HiveCatalog extends AbstractCatalog {
                 dataField.description());
     }
 
-    private boolean tableStoreTableExists(ObjectPath tablePath) {
-        return tableStoreTableExists(tablePath, true);
+    private boolean tableStoreTableExists(Identifier identifier) {
+        return tableStoreTableExists(identifier, true);
     }
 
-    private boolean tableStoreTableExists(ObjectPath tablePath, boolean 
throwException) {
+    private boolean tableStoreTableExists(Identifier identifier, boolean 
throwException) {
         Table table;
         try {
-            table = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+            table = client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
         } catch (NoSuchObjectException e) {
             return false;
         } catch (TException e) {
             throw new RuntimeException(
                     "Cannot determine if table "
-                            + tablePath.getFullName()
+                            + identifier.getFullName()
                             + " is a table store table.",
                     e);
         }
@@ -384,7 +386,7 @@ public class HiveCatalog extends AbstractCatalog {
             if (throwException) {
                 throw new IllegalArgumentException(
                         "Table "
-                                + tablePath.getFullName()
+                                + identifier.getFullName()
                                 + " is not a table store table. It's input 
format is "
                                 + table.getSd().getInputFormat()
                                 + " and its output format is "
@@ -396,19 +398,19 @@ public class HiveCatalog extends AbstractCatalog {
         return true;
     }
 
-    private SchemaManager schemaManager(ObjectPath tablePath) {
-        checkObjectPathUpperCase(tablePath);
-        return new 
SchemaManager(getTableLocation(tablePath)).withLock(lock(tablePath));
+    private SchemaManager schemaManager(Identifier identifier) {
+        checkIdentifierUpperCase(identifier);
+        return new 
SchemaManager(getTableLocation(identifier)).withLock(lock(identifier));
     }
 
-    private Lock lock(ObjectPath tablePath) {
+    private Lock lock(Identifier identifier) {
         if (!lockEnabled()) {
             return new Lock.EmptyLock();
         }
 
         HiveCatalogLock lock =
                 new HiveCatalogLock(client, checkMaxSleep(hiveConf), 
acquireTimeout(hiveConf));
-        return Lock.fromCatalog(lock, tablePath);
+        return Lock.fromCatalog(lock, identifier);
     }
 
     static IMetaStoreClient createClient(HiveConf hiveConf, String 
clientClassName) {
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 4c727f15..6fe32bd3 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.table.store.file.operation.Lock;
@@ -202,10 +201,9 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
     @Override
     public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
         try {
-            ObjectPath path = objectPath(ident);
             return new SparkTable(
-                    catalog.getTable(path),
-                    Lock.factory(catalog.lockFactory().orElse(null), path),
+                    catalog.getTable(toIdentifier(ident)),
+                    Lock.factory(catalog.lockFactory().orElse(null), 
toIdentifier(ident)),
                     conf);
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
@@ -217,7 +215,7 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
         List<SchemaChange> schemaChanges =
                 
Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
         try {
-            catalog.alterTable(objectPath(ident), schemaChanges, false);
+            catalog.alterTable(toIdentifier(ident), schemaChanges, false);
             return loadTable(ident);
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
@@ -233,7 +231,7 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
             throws TableAlreadyExistsException, NoSuchNamespaceException {
         try {
             catalog.createTable(
-                    objectPath(ident), toUpdateSchema(schema, partitions, 
properties), false);
+                    toIdentifier(ident), toUpdateSchema(schema, partitions, 
properties), false);
             return loadTable(ident);
         } catch (Catalog.TableAlreadyExistException e) {
             throw new TableAlreadyExistsException(ident);
@@ -247,7 +245,7 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
     @Override
     public boolean dropTable(Identifier ident) {
         try {
-            catalog.dropTable(objectPath(ident), false);
+            catalog.dropTable(toIdentifier(ident), false);
             return true;
         } catch (Catalog.TableNotExistException | NoSuchTableException e) {
             return false;
@@ -341,12 +339,14 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
         return namespace.length == 1;
     }
 
-    private ObjectPath objectPath(Identifier ident) throws 
NoSuchTableException {
+    private org.apache.flink.table.store.file.catalog.Identifier 
toIdentifier(Identifier ident)
+            throws NoSuchTableException {
         if (!isValidateNamespace(ident.namespace())) {
             throw new NoSuchTableException(ident);
         }
 
-        return new ObjectPath(ident.namespace()[0], ident.name());
+        return new org.apache.flink.table.store.file.catalog.Identifier(
+                ident.namespace()[0], ident.name());
     }
 
     // --------------------- unsupported methods ----------------------------

Reply via email to