This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 938d3323 [FLINK-30706] Remove flink-table-common dependency for table
store core
938d3323 is described below
commit 938d3323e39048b22b693716533910e66510ac9b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 17 17:39:50 2023 +0800
[FLINK-30706] Remove flink-table-common dependency for table store core
This closes #485
---
flink-table-store-codegen/pom.xml | 2 +-
.../apache/flink/table/store/codegen/SortSpec.java | 4 +-
.../CodeGeneratorContext.scala | 2 +-
.../SortCodeGenerator.scala | 2 +-
flink-table-store-common/pom.xml | 7 -
.../flink/table/store/format/FileFormat.java | 3 +-
.../apache/flink/table/store/types/BinaryType.java | 4 +-
.../apache/flink/table/store/types/CharType.java | 4 +-
.../apache/flink/table/store/types/DataField.java | 4 +-
.../table/store/types/DataTypeJsonParser.java | 8 +-
.../flink/table/store/types/DecimalType.java | 6 +-
.../table/store/types/LocalZonedTimestampType.java | 4 +-
.../apache/flink/table/store/types/RowType.java | 5 +-
.../apache/flink/table/store/types/TimeType.java | 4 +-
.../flink/table/store/types/TimestampType.java | 4 +-
.../flink/table/store/types/VarBinaryType.java | 4 +-
.../flink/table/store/types/VarCharType.java | 4 +-
.../flink/table/store/utils/EncodingUtils.java | 39 ++++
.../flink/table/store/utils/InstantiationUtil.java | 199 +++++++++++++++++++++
.../apache/flink/table/store/utils/TypeUtils.java | 5 +-
.../table/store/datagen/DataGenVisitorBase.java | 3 +-
.../store/datagen/RandomGeneratorVisitor.java | 3 +-
.../flink/table/store/types/DataTypesTest.java | 6 +-
.../flink/table/store/connector/FlinkCatalog.java | 27 +--
.../table/store/connector/sink/TableStoreSink.java | 8 +-
.../store/connector/AppendOnlyTableITCase.java | 3 +-
.../table/store/connector/FlinkCatalogTest.java | 5 +-
.../store/connector/ReadWriteTableITCase.java | 3 +-
.../table/store/connector/RescaleBucketITCase.java | 3 +-
flink-table-store-core/pom.xml | 7 -
.../org/apache/flink/table/store/CoreOptions.java | 3 +-
.../table/store/file/catalog/AbstractCatalog.java | 25 ++-
.../flink/table/store/file/catalog/Catalog.java | 69 ++++---
.../store/file/catalog/FileSystemCatalog.java | 39 ++--
.../flink/table/store/file/catalog/Identifier.java | 88 +++++++++
.../compact/aggregate/FieldAggregator.java | 3 +-
.../file/operation/AbstractFileStoreScan.java | 3 +-
.../flink/table/store/file/operation/Lock.java | 14 +-
.../store/file/utils/FileStorePathFactory.java | 1 -
.../table/store/file/utils/PartitionPathUtils.java | 123 +++++++++++++
.../ContinuousDataFileSnapshotEnumerator.java | 3 +-
.../store/file/schema/DataTypeJsonParserTest.java | 3 +-
.../table/store/file/schema/SchemaManagerTest.java | 3 +-
flink-table-store-format/pom.xml | 11 +-
.../store/format/avro/AvroSchemaConverter.java | 7 +-
.../apache/flink/table/store/hive/HiveCatalog.java | 106 +++++------
.../flink/table/store/spark/SparkCatalog.java | 18 +-
47 files changed, 649 insertions(+), 252 deletions(-)
diff --git a/flink-table-store-codegen/pom.xml
b/flink-table-store-codegen/pom.xml
index 3862c261..aaf0539d 100644
--- a/flink-table-store-codegen/pom.xml
+++ b/flink-table-store-codegen/pom.xml
@@ -40,7 +40,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
+ <artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
diff --git
a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
index 75e23270..fb2dbbb2 100644
---
a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
+++
b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/SortSpec.java
@@ -23,8 +23,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/** {@link SortSpec} describes how the data will be sorted. */
public class SortSpec {
@@ -34,7 +32,7 @@ public class SortSpec {
private final SortFieldSpec[] fieldSpecs;
public SortSpec(SortFieldSpec[] fieldSpecs) {
- this.fieldSpecs = checkNotNull(fieldSpecs);
+ this.fieldSpecs = fieldSpecs;
}
/** Gets all {@link SortFieldSpec} in the SortSpec. */
diff --git
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
index e3d3bcc1..98b9ec27 100644
---
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
+++
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/CodeGeneratorContext.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.codegen
import org.apache.flink.table.store.codegen.GenerateUtils.{newName, newNames}
import org.apache.flink.table.store.data.InternalSerializers
import org.apache.flink.table.store.types.DataType
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.table.store.utils.InstantiationUtil
import scala.collection.mutable
diff --git
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
index b5e322e5..144bb14a 100644
---
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
+++
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
@@ -42,7 +42,7 @@ class SortCodeGenerator(val input: RowType, val sortSpec:
SortSpec) {
/** Chunks for long, int, short, byte */
private val POSSIBLE_CHUNK_SIZES = Array(8, 4, 2, 1)
- /** For get${operator} set${operator} of
[[org.apache.flink.core.memory.MemorySegment]] */
+ /** For get${operator} set${operator} of MemorySegment. */
private val BYTE_OPERATOR_MAPPING = Map(8 -> "Long", 4 -> "Int", 2 ->
"Short", 1 -> "")
/** For primitive define */
diff --git a/flink-table-store-common/pom.xml b/flink-table-store-common/pom.xml
index c27e01a9..a2c9a87a 100644
--- a/flink-table-store-common/pom.xml
+++ b/flink-table-store-common/pom.xml
@@ -32,13 +32,6 @@ under the License.
<name>Flink Table Store : Common</name>
<dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-code-splitter</artifactId>
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
index 97f73486..e9baeda8 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.types.RowType;
@@ -98,7 +97,7 @@ public abstract class FileFormat {
fromIdentifier(identifier, options,
FileFormat.class.getClassLoader())
.orElseThrow(
() ->
- new ValidationException(
+ new RuntimeException(
String.format(
"Could not
find any factories that implement '%s' in the classpath.",
FileFormatFactory.class
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
index 656bc29f..1a7a1e9c 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/BinaryType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/** Data type of a fixed-length binary string (=a sequence of bytes). */
@@ -40,7 +38,7 @@ public class BinaryType extends DataType {
public BinaryType(boolean isNullable, int length) {
super(isNullable, DataTypeRoot.BINARY);
if (length < MIN_LENGTH) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Binary string length must be between %d and %d
(both inclusive).",
MIN_LENGTH, MAX_LENGTH));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
index a6b0e17b..95ce0331 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/CharType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/**
@@ -44,7 +42,7 @@ public class CharType extends DataType {
public CharType(boolean isNullable, int length) {
super(isNullable, DataTypeRoot.CHAR);
if (length < MIN_LENGTH) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Character string length must be between %d and %d
(both inclusive).",
MIN_LENGTH, MAX_LENGTH));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
index 499b18a0..ee88b44c 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataField.java
@@ -26,8 +26,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
-import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
-import static org.apache.flink.table.utils.EncodingUtils.escapeSingleQuotes;
+import static
org.apache.flink.table.store.utils.EncodingUtils.escapeIdentifier;
+import static
org.apache.flink.table.store.utils.EncodingUtils.escapeSingleQuotes;
/** Defines the field of a row type. */
public final class DataField implements Serializable {
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
index e2e273e7..60c172b7 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeJsonParser.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import javax.annotation.Nullable;
@@ -360,15 +358,15 @@ public final class DataTypeJsonParser {
return tokens.get(lastValidToken).cursorPosition + 1;
}
- private ValidationException parsingError(String cause, @Nullable
Throwable t) {
- return new ValidationException(
+ private IllegalArgumentException parsingError(String cause, @Nullable
Throwable t) {
+ return new IllegalArgumentException(
String.format(
"Could not parse type at position %d: %s\n Input
type string: %s",
lastCursor(), cause, inputString),
t);
}
- private ValidationException parsingError(String cause) {
+ private IllegalArgumentException parsingError(String cause) {
return parsingError(cause, null);
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
index d5c71104..bff82492 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DecimalType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/** Data type of a decimal number with fixed precision and scale. */
@@ -46,13 +44,13 @@ public class DecimalType extends DataType {
public DecimalType(boolean isNullable, int precision, int scale) {
super(isNullable, DataTypeRoot.DECIMAL);
if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Decimal precision must be between %d and %d (both
inclusive).",
MIN_PRECISION, MAX_PRECISION));
}
if (scale < MIN_SCALE || scale > precision) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Decimal scale must be between %d and the
precision %d (both inclusive).",
MIN_SCALE, precision));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
index 41726a21..dcad9b39 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/LocalZonedTimestampType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/**
@@ -53,7 +51,7 @@ public final class LocalZonedTimestampType extends DataType {
public LocalZonedTimestampType(boolean isNullable, int precision) {
super(isNullable, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Timestamp with local time zone precision must be
between %d and %d (both inclusive).",
MIN_PRECISION, MAX_PRECISION));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
index 34ea1067..ee60253f 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/RowType.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -140,7 +139,7 @@ public final class RowType extends DataType {
final List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly))
{
- throw new ValidationException(
+ throw new IllegalArgumentException(
"Field names must contain at least one non-whitespace
character.");
}
final Set<String> duplicates =
@@ -148,7 +147,7 @@ public final class RowType extends DataType {
.filter(n -> Collections.frequency(fieldNames, n) > 1)
.collect(Collectors.toSet());
if (!duplicates.isEmpty()) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format("Field names must be unique. Found
duplicates: %s", duplicates));
}
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
index 91186d3e..239e0b5c 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimeType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/**
@@ -49,7 +47,7 @@ public final class TimeType extends DataType {
public TimeType(boolean isNullable, int precision) {
super(isNullable, DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Time precision must be between %d and %d (both
inclusive).",
MIN_PRECISION, MAX_PRECISION));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
index 19a26169..84c8d4b0 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/TimestampType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/**
@@ -48,7 +46,7 @@ public class TimestampType extends DataType {
public TimestampType(boolean isNullable, int precision) {
super(isNullable, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Timestamp precision must be between %d and %d
(both inclusive).",
MIN_PRECISION, MAX_PRECISION));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
index 2d40e648..1a576eeb 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarBinaryType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/** Data type of a variable-length binary string (=a sequence of bytes). */
@@ -42,7 +40,7 @@ public final class VarBinaryType extends DataType {
public VarBinaryType(boolean isNullable, int length) {
super(isNullable, DataTypeRoot.VARBINARY);
if (length < MIN_LENGTH) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Variable binary string length must be between %d
and %d (both inclusive).",
MIN_LENGTH, MAX_LENGTH));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
index c6f0f383..0b6c74ca 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/VarCharType.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import java.util.Objects;
/**
@@ -48,7 +46,7 @@ public final class VarCharType extends DataType {
public VarCharType(boolean isNullable, int length) {
super(isNullable, DataTypeRoot.VARCHAR);
if (length < MIN_LENGTH) {
- throw new ValidationException(
+ throw new IllegalArgumentException(
String.format(
"Variable character string length must be between
%d and %d (both inclusive).",
MIN_LENGTH, MAX_LENGTH));
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/EncodingUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/EncodingUtils.java
new file mode 100644
index 00000000..2f8b3394
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/EncodingUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+/** General utilities for string-encoding. */
+public abstract class EncodingUtils {
+
+ private EncodingUtils() {
+ // do not instantiate
+ }
+
+ public static String escapeBackticks(String s) {
+ return s.replace("`", "``");
+ }
+
+ public static String escapeSingleQuotes(String s) {
+ return s.replace("'", "''");
+ }
+
+ public static String escapeIdentifier(String s) {
+ return "`" + escapeBackticks(s) + "`";
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/InstantiationUtil.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/InstantiationUtil.java
new file mode 100644
index 00000000..6b72750a
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/InstantiationUtil.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+
+/** Utility class to create instances from class objects and checking failure
reasons. */
+public final class InstantiationUtil {
+
+ public static <T> T deserializeObject(byte[] bytes, ClassLoader cl)
+ throws IOException, ClassNotFoundException {
+
+ return deserializeObject(new ByteArrayInputStream(bytes), cl);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> T deserializeObject(InputStream in, ClassLoader cl)
+ throws IOException, ClassNotFoundException {
+
+ final ClassLoader old = Thread.currentThread().getContextClassLoader();
+ // not using resource try to avoid AutoClosable's close() on the given
stream
+ try {
+ ObjectInputStream oois = new
InstantiationUtil.ClassLoaderObjectInputStream(in, cl);
+ Thread.currentThread().setContextClassLoader(cl);
+ return (T) oois.readObject();
+ } finally {
+ Thread.currentThread().setContextClassLoader(old);
+ }
+ }
+
+ public static byte[] serializeObject(Object o) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(o);
+ oos.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ /**
+ * Clones the given serializable object using Java serialization.
+ *
+ * @param obj Object to clone
+ * @param <T> Type of the object to clone
+ * @return The cloned object
+ * @throws IOException Thrown if the serialization or deserialization
process fails.
+ * @throws ClassNotFoundException Thrown if any of the classes referenced
by the object cannot
+ * be resolved during deserialization.
+ */
+ public static <T extends Serializable> T clone(T obj)
+ throws IOException, ClassNotFoundException {
+ if (obj == null) {
+ return null;
+ } else {
+ return clone(obj, obj.getClass().getClassLoader());
+ }
+ }
+
+ /**
+ * Clones the given serializable object using Java serialization, using
the given classloader to
+ * resolve the cloned classes.
+ *
+ * @param obj Object to clone
+ * @param classLoader The classloader to resolve the classes during
deserialization.
+ * @param <T> Type of the object to clone
+ * @return Cloned object
+ * @throws IOException Thrown if the serialization or deserialization
process fails.
+ * @throws ClassNotFoundException Thrown if any of the classes referenced
by the object cannot
+ * be resolved during deserialization.
+ */
+ public static <T extends Serializable> T clone(T obj, ClassLoader
classLoader)
+ throws IOException, ClassNotFoundException {
+ if (obj == null) {
+ return null;
+ } else {
+ final byte[] serializedObject = serializeObject(obj);
+ return deserializeObject(serializedObject, classLoader);
+ }
+ }
+
+ /** A custom ObjectInputStream that can load classes using a specific
ClassLoader. */
+ public static class ClassLoaderObjectInputStream extends ObjectInputStream
{
+
+ protected final ClassLoader classLoader;
+
+ public ClassLoaderObjectInputStream(InputStream in, ClassLoader
classLoader)
+ throws IOException {
+ super(in);
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc)
+ throws IOException, ClassNotFoundException {
+ if (classLoader != null) {
+ String name = desc.getName();
+ try {
+ return Class.forName(name, false, classLoader);
+ } catch (ClassNotFoundException ex) {
+ // check if class is a primitive class
+ Class<?> cl = primitiveClasses.get(name);
+ if (cl != null) {
+ // return primitive class
+ return cl;
+ } else {
+ // throw ClassNotFoundException
+ throw ex;
+ }
+ }
+ }
+
+ return super.resolveClass(desc);
+ }
+
+ @Override
+ protected Class<?> resolveProxyClass(String[] interfaces)
+ throws IOException, ClassNotFoundException {
+ if (classLoader != null) {
+ ClassLoader nonPublicLoader = null;
+ boolean hasNonPublicInterface = false;
+
+ // define proxy in class loader of non-public interface(s), if
any
+ Class<?>[] classObjs = new Class<?>[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++) {
+ Class<?> cl = Class.forName(interfaces[i], false,
classLoader);
+ if ((cl.getModifiers() & Modifier.PUBLIC) == 0) {
+ if (hasNonPublicInterface) {
+ if (nonPublicLoader != cl.getClassLoader()) {
+ throw new IllegalAccessError(
+ "conflicting non-public interface
class loaders");
+ }
+ } else {
+ nonPublicLoader = cl.getClassLoader();
+ hasNonPublicInterface = true;
+ }
+ }
+ classObjs[i] = cl;
+ }
+ try {
+ return Proxy.getProxyClass(
+ hasNonPublicInterface ? nonPublicLoader :
classLoader, classObjs);
+ } catch (IllegalArgumentException e) {
+ throw new ClassNotFoundException(null, e);
+ }
+ }
+
+ return super.resolveProxyClass(interfaces);
+ }
+
+ // ------------------------------------------------
+
+ private static final HashMap<String, Class<?>> primitiveClasses = new
HashMap<>(9);
+
+ static {
+ primitiveClasses.put("boolean", boolean.class);
+ primitiveClasses.put("byte", byte.class);
+ primitiveClasses.put("char", char.class);
+ primitiveClasses.put("short", short.class);
+ primitiveClasses.put("int", int.class);
+ primitiveClasses.put("long", long.class);
+ primitiveClasses.put("float", float.class);
+ primitiveClasses.put("double", double.class);
+ primitiveClasses.put("void", void.class);
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ /** Private constructor to prevent instantiation. */
+ private InstantiationUtil() {
+ throw new RuntimeException();
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
index 8c0c4f0c..c500b206 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/TypeUtils.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.utils;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.data.BinaryString;
import org.apache.flink.table.store.data.Decimal;
import org.apache.flink.table.store.data.Timestamp;
@@ -112,7 +111,7 @@ public class TypeUtils {
}
/** Parse a {@link BinaryString} to boolean. */
- public static boolean toBoolean(BinaryString str) throws TableException {
+ public static boolean toBoolean(BinaryString str) {
BinaryString lowerCase = str.toLowerCase();
if (TRUE_STRINGS.contains(lowerCase)) {
return true;
@@ -120,7 +119,7 @@ public class TypeUtils {
if (FALSE_STRINGS.contains(lowerCase)) {
return false;
}
- throw new TableException("Cannot parse '" + str + "' as BOOLEAN.");
+ throw new RuntimeException("Cannot parse '" + str + "' as BOOLEAN.");
}
public static int toDate(BinaryString input) throws DateTimeException {
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
index 07021635..a97be0e4 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/DataGenVisitorBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.datagen;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.types.DataType;
import org.apache.flink.table.store.types.DataTypeDefaultVisitor;
import org.apache.flink.table.store.types.DateType;
@@ -59,7 +58,7 @@ public abstract class DataGenVisitorBase extends
DataTypeDefaultVisitor<DataGene
@Override
protected DataGeneratorContainer defaultMethod(DataType dataType) {
- throw new ValidationException("Unsupported type: " + dataType);
+ throw new RuntimeException("Unsupported type: " + dataType);
}
private interface SerializableSupplier<T> extends Supplier<T>,
Serializable {}
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
index fcc755bc..5d91e039 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/datagen/RandomGeneratorVisitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.data.BinaryString;
import org.apache.flink.table.store.data.Decimal;
import org.apache.flink.table.store.data.GenericArray;
@@ -318,7 +317,7 @@ public class RandomGeneratorVisitor extends
DataGenVisitorBase {
@Override
protected DataGeneratorContainer defaultMethod(DataType dataType) {
- throw new ValidationException("Unsupported type: " + dataType);
+ throw new RuntimeException("Unsupported type: " + dataType);
}
private static RandomGenerator<BinaryString> getRandomStringGenerator(int
length) {
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
index 08f553ee..b986d9b7 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/types/DataTypesTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.types;
-import org.apache.flink.table.api.ValidationException;
-
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
@@ -198,14 +196,14 @@ public class DataTypesTest {
new DataField(1, "b", new
VarCharType()),
new DataField(2, "a", new
VarCharType()),
new DataField(3, "a", new
TimestampType()))))
- .isInstanceOf(ValidationException.class);
+ .isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(
() ->
new RowType(
Collections.singletonList(
new DataField(0, "", new
VarCharType()))))
- .isInstanceOf(ValidationException.class);
+ .isInstanceOf(IllegalArgumentException.class);
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index bf915466..0293d744 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.Identifier;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -159,9 +160,9 @@ public class FlinkCatalog extends AbstractCatalog {
throws TableNotExistException, CatalogException {
Table table;
try {
- table = catalog.getTable(tablePath);
+ table = catalog.getTable(toIdentifier(tablePath));
} catch (Catalog.TableNotExistException e) {
- throw new TableNotExistException(getName(), e.tablePath());
+ throw new TableNotExistException(getName(), tablePath);
}
if (table instanceof FileStoreTable) {
@@ -170,7 +171,7 @@ public class FlinkCatalog extends AbstractCatalog {
// add path to source and sink
catalogTable
.getOptions()
- .put(PATH.key(),
catalog.getTableLocation(tablePath).toString());
+ .put(PATH.key(),
catalog.getTableLocation(toIdentifier(tablePath)).toString());
return catalogTable;
} else {
return new SystemCatalogTable(table);
@@ -179,16 +180,16 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
- return catalog.tableExists(tablePath);
+ return catalog.tableExists(toIdentifier(tablePath));
}
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
- catalog.dropTable(tablePath, ignoreIfNotExists);
+ catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
} catch (Catalog.TableNotExistException e) {
- throw new TableNotExistException(getName(), e.tablePath());
+ throw new TableNotExistException(getName(), tablePath);
}
}
@@ -217,9 +218,11 @@ public class FlinkCatalog extends AbstractCatalog {
try {
catalog.createTable(
- tablePath, FlinkCatalog.fromCatalogTable(catalogTable),
ignoreIfExists);
+ toIdentifier(tablePath),
+ FlinkCatalog.fromCatalogTable(catalogTable),
+ ignoreIfExists);
} catch (Catalog.TableAlreadyExistException e) {
- throw new TableAlreadyExistException(getName(), e.tablePath());
+ throw new TableAlreadyExistException(getName(), tablePath);
} catch (Catalog.DatabaseNotExistException e) {
throw new DatabaseNotExistException(getName(), e.database());
}
@@ -265,9 +268,9 @@ public class FlinkCatalog extends AbstractCatalog {
});
try {
- catalog.alterTable(tablePath, changes, ignoreIfNotExists);
+ catalog.alterTable(toIdentifier(tablePath), changes,
ignoreIfNotExists);
} catch (Catalog.TableNotExistException e) {
- throw new TableNotExistException(getName(), e.tablePath());
+ throw new TableNotExistException(getName(), tablePath);
}
}
@@ -372,6 +375,10 @@ public class FlinkCatalog extends AbstractCatalog {
catalogTable.getComment());
}
+ public static Identifier toIdentifier(ObjectPath path) {
+ return new Identifier(path.getDatabaseName(), path.getObjectName());
+ }
+
// --------------------- unsupported methods ----------------------------
@Override
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 27b4aaf3..4abf0ee6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -28,6 +28,7 @@ import
org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.connector.FlinkCatalog;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
import org.apache.flink.table.store.file.catalog.CatalogLock;
@@ -49,7 +50,7 @@ import java.util.Map;
import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
-/** Table sink to create {@link StoreSink}. */
+/** Table sink to create sink. */
public class TableStoreSink implements DynamicTableSink, SupportsOverwrite,
SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
@@ -129,7 +130,10 @@ public class TableStoreSink implements DynamicTableSink,
SupportsOverwrite, Supp
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.withLockFactory(
- Lock.factory(lockFactory,
tableIdentifier.toObjectPath()))
+ Lock.factory(
+ lockFactory,
+ FlinkCatalog.toIdentifier(
+
tableIdentifier.toObjectPath())))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ?
staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 354c7a5f..757934eb 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.types.Row;
@@ -43,7 +42,7 @@ public class AppendOnlyTableITCase extends CatalogITCaseBase {
batchSql(
"CREATE TABLE pk_table (id INT PRIMARY
KEY NOT ENFORCED, data STRING) "
+ "WITH
('write-mode'='append-only')"))
- .hasRootCauseInstanceOf(TableException.class)
+ .hasRootCauseInstanceOf(RuntimeException.class)
.hasRootCauseMessage(
"Cannot define any primary key in an append-only
table. Set 'write-mode'='change-log' if still "
+ "want to keep the primary key definition.");
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
index 93271875..9aedf0d8 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -411,7 +411,10 @@ public class FlinkCatalogTest {
}
private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable
t2) {
- Path tablePath = ((FlinkCatalog)
catalog).catalog().getTableLocation(path);
+ Path tablePath =
+ ((FlinkCatalog) catalog)
+ .catalog()
+ .getTableLocation(FlinkCatalog.toIdentifier(path));
Map<String, String> options = new HashMap<>(t1.getOptions());
options.put("path", tablePath.toString());
t1 = ((ResolvedCatalogTable) t1).copy(options);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index cad59467..ffc19655 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -1107,7 +1106,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
.containsExactlyInAnyOrder(changelogRow("+I", "US Dollar",
102L, "2022-06-20"));
assertThatThrownBy(() -> insertInto(table, "('US Dollar', 102,
'2022-06-20')"))
.rootCause()
- .isInstanceOf(TableException.class)
+ .isInstanceOf(RuntimeException.class)
.hasMessage(
String.format(
"Try to write partition {dt=2022-06-20} with a
new bucket num %d, but the previous bucket num is 2. "
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index 328045c4..0ecc32ed 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -194,7 +193,7 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
// check write without rescale
assertThatThrownBy(() -> batchSql("INSERT INTO %s VALUES (6)",
tableName))
.getRootCause()
- .isInstanceOf(TableException.class)
+ .isInstanceOf(RuntimeException.class)
.hasMessage(
"Try to write table with a new bucket num 4, but the
previous bucket num is 2. "
+ "Please switch to batch mode, and perform
INSERT OVERWRITE to rescale current data layout first.");
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index 478dc980..3b54e349 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -53,13 +53,6 @@ under the License.
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index aedfddd1..6eba7099 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -29,7 +29,6 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.format.FileFormat;
@@ -847,7 +846,7 @@ public class CoreOptions implements Serializable {
// Cannot define any primary key in an append-only table.
if (!schema.primaryKeys().isEmpty() && Objects.equals(APPEND_ONLY,
options.writeMode())) {
- throw new TableException(
+ throw new RuntimeException(
"Cannot define any primary key in an append-only table.
Set 'write-mode'='change-log' if "
+ "still want to keep the primary key
definition.");
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
index 4b7a7b70..a356ee76 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.catalog;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.Table;
@@ -33,19 +32,19 @@ public abstract class AbstractCatalog implements Catalog {
protected static final String DB_SUFFIX = ".db";
@Override
- public Path getTableLocation(ObjectPath tablePath) {
- if (tablePath.getObjectName().contains(SYSTEM_TABLE_SPLITTER)) {
+ public Path getTableLocation(Identifier identifier) {
+ if (identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)) {
throw new IllegalArgumentException(
String.format(
"Table name[%s] cannot contain '%s' separator",
- tablePath.getObjectName(), SYSTEM_TABLE_SPLITTER));
+ identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER));
}
- return new Path(databasePath(tablePath.getDatabaseName()),
tablePath.getObjectName());
+ return new Path(databasePath(identifier.getDatabaseName()),
identifier.getObjectName());
}
@Override
- public Table getTable(ObjectPath tablePath) throws TableNotExistException {
- String inputTableName = tablePath.getObjectName();
+ public Table getTable(Identifier identifier) throws TableNotExistException
{
+ String inputTableName = identifier.getObjectName();
if (inputTableName.contains(SYSTEM_TABLE_SPLITTER)) {
String[] splits = StringUtils.split(inputTableName,
SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
@@ -55,15 +54,15 @@ public abstract class AbstractCatalog implements Catalog {
}
String table = splits[0];
String type = splits[1];
- ObjectPath originTablePath = new
ObjectPath(tablePath.getDatabaseName(), table);
- if (!tableExists(originTablePath)) {
- throw new TableNotExistException(tablePath);
+ Identifier originidentifier = new
Identifier(identifier.getDatabaseName(), table);
+ if (!tableExists(originidentifier)) {
+ throw new TableNotExistException(identifier);
}
- Path location = getTableLocation(originTablePath);
+ Path location = getTableLocation(originidentifier);
return SystemTableLoader.load(type, location);
} else {
- TableSchema tableSchema = getTableSchema(tablePath);
- return FileStoreTableFactory.create(getTableLocation(tablePath),
tableSchema);
+ TableSchema tableSchema = getTableSchema(identifier);
+ return FileStoreTableFactory.create(getTableLocation(identifier),
tableSchema);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 919f6067..5cb76ade 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.catalog;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -94,72 +93,72 @@ public interface Catalog extends AutoCloseable {
List<String> listTables(String databaseName) throws
DatabaseNotExistException;
/**
- * Return the table location path identified by the given {@link
ObjectPath}.
+ * Return the table location path identified by the given {@link
Identifier}.
*
- * @param tablePath Path of the table
+ * @param identifier Path of the table
* @return The requested table location
*/
- Path getTableLocation(ObjectPath tablePath);
+ Path getTableLocation(Identifier identifier);
/**
- * Return a {@link TableSchema} identified by the given {@link ObjectPath}.
+ * Return a {@link TableSchema} identified by the given {@link Identifier}.
*
- * @param tablePath Path of the table
+ * @param identifier Path of the table
* @return The requested table schema
* @throws TableNotExistException if the target does not exist
*/
- TableSchema getTableSchema(ObjectPath tablePath) throws
TableNotExistException;
+ TableSchema getTableSchema(Identifier identifier) throws
TableNotExistException;
/**
- * Return a {@link Table} identified by the given {@link ObjectPath}.
+ * Return a {@link Table} identified by the given {@link Identifier}.
*
- * @param tablePath Path of the table
+ * @param identifier Path of the table
* @return The requested table
* @throws TableNotExistException if the target does not exist
*/
- Table getTable(ObjectPath tablePath) throws TableNotExistException;
+ Table getTable(Identifier identifier) throws TableNotExistException;
/**
* Check if a table exists in this catalog.
*
- * @param tablePath Path of the table
+ * @param identifier Path of the table
* @return true if the given table exists in the catalog false otherwise
*/
- boolean tableExists(ObjectPath tablePath);
+ boolean tableExists(Identifier identifier);
/**
* Drop a table.
*
- * @param tablePath Path of the table to be dropped
+ * @param identifier Path of the table to be dropped
* @param ignoreIfNotExists Flag to specify behavior when the table does
not exist: if set to
* false, throw an exception, if set to true, do nothing.
* @throws TableNotExistException if the table does not exist
*/
- void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws
TableNotExistException;
+ void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws
TableNotExistException;
/**
* Create a new table.
*
- * @param tablePath path of the table to be created
+ * @param identifier path of the table to be created
* @param tableSchema the table definition
* @param ignoreIfExists flag to specify behavior when a table already
exists at the given path:
* if set to false, it throws a TableAlreadyExistException, if set to
true, do nothing.
* @throws TableAlreadyExistException if table already exists and
ignoreIfExists is false
- * @throws DatabaseNotExistException if the database in tablePath doesn't
exist
+ * @throws DatabaseNotExistException if the database in identifier doesn't
exist
*/
- void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean
ignoreIfExists)
+ void createTable(Identifier identifier, UpdateSchema tableSchema, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException;
/**
* Modify an existing table from {@link SchemaChange}s.
*
- * @param tablePath path of the table to be modified
+ * @param identifier path of the table to be modified
* @param changes the schema changes
* @param ignoreIfNotExists flag to specify behavior when the table does
not exist: if set to
* false, throw an exception, if set to true, do nothing.
* @throws TableNotExistException if the table does not exist
*/
- void alterTable(ObjectPath tablePath, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ void alterTable(Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
throws TableNotExistException;
/** Exception for trying to drop on a database that is not empty. */
@@ -227,19 +226,19 @@ public interface Catalog extends AutoCloseable {
private static final String MSG = "Table %s already exists.";
- private final ObjectPath tablePath;
+ private final Identifier identifier;
- public TableAlreadyExistException(ObjectPath tablePath) {
- this(tablePath, null);
+ public TableAlreadyExistException(Identifier identifier) {
+ this(identifier, null);
}
- public TableAlreadyExistException(ObjectPath tablePath, Throwable
cause) {
- super(String.format(MSG, tablePath.getFullName()), cause);
- this.tablePath = tablePath;
+ public TableAlreadyExistException(Identifier identifier, Throwable
cause) {
+ super(String.format(MSG, identifier.getFullName()), cause);
+ this.identifier = identifier;
}
- public ObjectPath tablePath() {
- return tablePath;
+ public Identifier identifier() {
+ return identifier;
}
}
@@ -248,19 +247,19 @@ public interface Catalog extends AutoCloseable {
private static final String MSG = "Table %s does not exist.";
- private final ObjectPath tablePath;
+ private final Identifier identifier;
- public TableNotExistException(ObjectPath tablePath) {
- this(tablePath, null);
+ public TableNotExistException(Identifier identifier) {
+ this(identifier, null);
}
- public TableNotExistException(ObjectPath tablePath, Throwable cause) {
- super(String.format(MSG, tablePath.getFullName()), cause);
- this.tablePath = tablePath;
+ public TableNotExistException(Identifier identifier, Throwable cause) {
+ super(String.format(MSG, identifier.getFullName()), cause);
+ this.identifier = identifier;
}
- public ObjectPath tablePath() {
- return tablePath;
+ public Identifier identifier() {
+ return identifier;
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 6dae077e..d9c54cb1 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.catalog;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -113,51 +112,51 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getTableSchema(ObjectPath tablePath) throws
TableNotExistException {
- Path path = getTableLocation(tablePath);
+ public TableSchema getTableSchema(Identifier identifier) throws
TableNotExistException {
+ Path path = getTableLocation(identifier);
return new SchemaManager(path)
.latest()
- .orElseThrow(() -> new TableNotExistException(tablePath));
+ .orElseThrow(() -> new TableNotExistException(identifier));
}
@Override
- public boolean tableExists(ObjectPath tablePath) {
- return tableExists(getTableLocation(tablePath));
+ public boolean tableExists(Identifier identifier) {
+ return tableExists(getTableLocation(identifier));
}
- private boolean tableExists(Path tablePath) {
- return new SchemaManager(tablePath).listAllIds().size() > 0;
+ private boolean tableExists(Path identifier) {
+ return new SchemaManager(identifier).listAllIds().size() > 0;
}
@Override
- public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
- Path path = getTableLocation(tablePath);
+ Path path = getTableLocation(identifier);
if (!tableExists(path)) {
if (ignoreIfNotExists) {
return;
}
- throw new TableNotExistException(tablePath);
+ throw new TableNotExistException(identifier);
}
uncheck(() -> fs.delete(path, true));
}
@Override
- public void createTable(ObjectPath tablePath, UpdateSchema table, boolean
ignoreIfExists)
+ public void createTable(Identifier identifier, UpdateSchema table, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
- if (!databaseExists(tablePath.getDatabaseName())) {
- throw new DatabaseNotExistException(tablePath.getDatabaseName());
+ if (!databaseExists(identifier.getDatabaseName())) {
+ throw new DatabaseNotExistException(identifier.getDatabaseName());
}
- Path path = getTableLocation(tablePath);
+ Path path = getTableLocation(identifier);
if (tableExists(path)) {
if (ignoreIfExists) {
return;
}
- throw new TableAlreadyExistException(tablePath);
+ throw new TableAlreadyExistException(identifier);
}
uncheck(() -> new SchemaManager(path).commitNewVersion(table));
@@ -165,12 +164,12 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public void alterTable(
- ObjectPath tablePath, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
throws TableNotExistException {
- if (!tableExists(tablePath)) {
- throw new TableNotExistException(tablePath);
+ if (!tableExists(identifier)) {
+ throw new TableNotExistException(identifier);
}
- uncheck(() -> new
SchemaManager(getTableLocation(tablePath)).commitChanges(changes));
+ uncheck(() -> new
SchemaManager(getTableLocation(identifier)).commitChanges(changes));
}
private static <T> T uncheck(Callable<T> callable) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
new file mode 100644
index 00000000..0d3735ef
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Identifies an object in a catalog. */
+public class Identifier implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String database;
+ private final String table;
+
+ public Identifier(String database, String table) {
+ this.database = database;
+ this.table = table;
+ }
+
+ public String getDatabaseName() {
+ return database;
+ }
+
+ public String getObjectName() {
+ return table;
+ }
+
+ public String getFullName() {
+ return String.format("%s.%s", database, table);
+ }
+
+ public static Identifier fromString(String fullName) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(fullName), "fullName
cannot be null or empty");
+
+ String[] paths = fullName.split("\\.");
+
+ if (paths.length != 2) {
+ throw new IllegalArgumentException(
+ String.format("Cannot get split '%s' to get database and
table", fullName));
+ }
+
+ return new Identifier(paths[0], paths[1]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Identifier that = (Identifier) o;
+ return Objects.equals(database, that.database) &&
Objects.equals(table, that.table);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(database, table);
+ }
+
+ @Override
+ public String toString() {
+ return "Identifier{" + "database='" + database + '\'' + ", table='" +
table + '\'' + '}';
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
index 2a7e52e4..7625761c 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.mergetree.compact.aggregate;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.types.DataType;
import java.io.Serializable;
@@ -64,7 +63,7 @@ public abstract class FieldAggregator implements Serializable
{
fieldAggregator = new FieldBoolAndAgg(fieldType);
break;
default:
- throw new ValidationException(
+ throw new RuntimeException(
"Use unsupported aggregation or spell aggregate
function incorrectly!");
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 7dfd3dfc..79bbace2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -219,7 +218,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
.defaultValue())
.generatePartValues(file.partition())
: "table";
- throw new TableException(
+ throw new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but
the previous bucket num is %d. "
+ "Please switch to batch mode, and
perform INSERT OVERWRITE to rescale current data layout first.",
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
index ca3ab6cc..a4ee9c73 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -18,8 +18,8 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.catalog.Identifier;
import javax.annotation.Nullable;
@@ -37,7 +37,7 @@ public interface Lock extends AutoCloseable {
Lock create();
}
- static Factory factory(@Nullable CatalogLock.Factory lockFactory,
ObjectPath tablePath) {
+ static Factory factory(@Nullable CatalogLock.Factory lockFactory,
Identifier tablePath) {
return lockFactory == null
? new EmptyFactory()
: new CatalogLockFactory(lockFactory, tablePath);
@@ -53,9 +53,9 @@ public interface Lock extends AutoCloseable {
private static final long serialVersionUID = 1L;
private final CatalogLock.Factory lockFactory;
- private final ObjectPath tablePath;
+ private final Identifier tablePath;
- public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath
tablePath) {
+ public CatalogLockFactory(CatalogLock.Factory lockFactory, Identifier
tablePath) {
this.lockFactory = lockFactory;
this.tablePath = tablePath;
}
@@ -88,7 +88,7 @@ public interface Lock extends AutoCloseable {
public void close() {}
}
- static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
+ static Lock fromCatalog(CatalogLock lock, Identifier tablePath) {
if (lock == null) {
return new EmptyLock();
}
@@ -99,9 +99,9 @@ public interface Lock extends AutoCloseable {
class CatalogLockImpl implements Lock {
private final CatalogLock catalogLock;
- private final ObjectPath tablePath;
+ private final Identifier tablePath;
- private CatalogLockImpl(CatalogLock catalogLock, ObjectPath tablePath)
{
+ private CatalogLockImpl(CatalogLock catalogLock, Identifier tablePath)
{
this.catalogLock = catalogLock;
this.tablePath = tablePath;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 8db38280..b7e81bb4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.concurrent.ThreadSafe;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionPathUtils.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionPathUtils.java
new file mode 100644
index 00000000..49f7e333
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionPathUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.Path;
+
+import java.util.BitSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Utils for file system. */
+public class PartitionPathUtils {
+
+ private static final BitSet CHAR_TO_ESCAPE = new BitSet(128);
+
+ static {
+ for (char c = 0; c < ' '; c++) {
+ CHAR_TO_ESCAPE.set(c);
+ }
+
+ /*
+ * ASCII 01-1F are HTTP control characters that need to be escaped.
+ * \u000A and \u000D are \n and \r, respectively.
+ */
+ char[] clist =
+ new char[] {
+ '\u0001', '\u0002', '\u0003', '\u0004', '\u0005',
'\u0006', '\u0007', '\u0008',
+ '\u0009', '\n', '\u000B', '\u000C', '\r', '\u000E',
'\u000F', '\u0010',
+ '\u0011', '\u0012', '\u0013', '\u0014', '\u0015',
'\u0016', '\u0017', '\u0018',
+ '\u0019', '\u001A', '\u001B', '\u001C', '\u001D',
'\u001E', '\u001F', '"', '#',
+ '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{',
'}', '[', ']', '^'
+ };
+
+ for (char c : clist) {
+ CHAR_TO_ESCAPE.set(c);
+ }
+ }
+
+ private static boolean needsEscaping(char c) {
+ return c < CHAR_TO_ESCAPE.size() && CHAR_TO_ESCAPE.get(c);
+ }
+
+ /**
+ * Make partition path from partition spec.
+ *
+ * @param partitionSpec The partition spec.
+ * @return An escaped, valid partition name.
+ */
+ public static String generatePartitionPath(LinkedHashMap<String, String>
partitionSpec) {
+ if (partitionSpec.isEmpty()) {
+ return "";
+ }
+ StringBuilder suffixBuf = new StringBuilder();
+ int i = 0;
+ for (Map.Entry<String, String> e : partitionSpec.entrySet()) {
+ if (i > 0) {
+ suffixBuf.append(Path.SEPARATOR);
+ }
+ suffixBuf.append(escapePathName(e.getKey()));
+ suffixBuf.append('=');
+ suffixBuf.append(escapePathName(e.getValue()));
+ i++;
+ }
+ suffixBuf.append(Path.SEPARATOR);
+ return suffixBuf.toString();
+ }
+
+ /**
+ * Escapes a path name.
+ *
+ * @param path The path to escape.
+ * @return An escaped path name.
+ */
+ static String escapePathName(String path) {
+ if (path == null || path.length() == 0) {
+ throw new RuntimeException("Path should not be null or empty: " +
path);
+ }
+
+ StringBuilder sb = null;
+ for (int i = 0; i < path.length(); i++) {
+ char c = path.charAt(i);
+ if (needsEscaping(c)) {
+ if (sb == null) {
+ sb = new StringBuilder(path.length() + 2);
+ for (int j = 0; j < i; j++) {
+ sb.append(path.charAt(j));
+ }
+ }
+ escapeChar(c, sb);
+ } else if (sb != null) {
+ sb.append(c);
+ }
+ }
+ if (sb == null) {
+ return path;
+ }
+ return sb.toString();
+ }
+
+ private static void escapeChar(char c, StringBuilder sb) {
+ sb.append('%');
+ if (c < 16) {
+ sb.append('0');
+ }
+ sb.append(Integer.toHexString(c).toUpperCase());
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index bee9b94f..feef902d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table.source.snapshot;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -191,7 +190,7 @@ public class ContinuousDataFileSnapshotEnumerator
implements SnapshotEnumerator
if (schema.primaryKeys().size() > 0
&& mergeEngineDesc.containsKey(mergeEngine)
&& options.changelogProducer() != FULL_COMPACTION) {
- throw new ValidationException(
+ throw new RuntimeException(
mergeEngineDesc.get(mergeEngine)
+ " continuous reading is not supported. "
+ "You can use full compaction changelog producer
to support streaming reading.");
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
index cbaad530..0d6dbfec 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/DataTypeJsonParserTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.schema;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.types.ArrayType;
import org.apache.flink.table.store.types.BigIntType;
@@ -195,7 +194,7 @@ public class DataTypeJsonParserTest {
void testErrorMessage(TestSpec testSpec) {
if (testSpec.expectedErrorMessage != null) {
assertThatThrownBy(() -> parse(testSpec.jsonString))
- .isInstanceOf(ValidationException.class)
+ .isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(testSpec.expectedErrorMessage);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index c11d091e..6830c5f8 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.schema;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -293,7 +292,7 @@ public class SchemaManagerTest {
() ->
retryArtificialException(
() ->
manager.commitNewVersion(updateSchema)))
- .isInstanceOf(TableException.class)
+ .isInstanceOf(RuntimeException.class)
.hasMessage(
"Cannot define any primary key in an append-only
table. "
+ "Set 'write-mode'='change-log' if still want
to keep the primary key definition.");
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index 3ace2d3e..abab49c9 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -44,18 +44,9 @@ under the License.
<scope>provided</scope>
</dependency>
- <!-- flink dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>${flink.table.runtime}</artifactId>
+ <artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
index 641c495d..c8f36860 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.format.avro;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.store.types.ArrayType;
import org.apache.flink.table.store.types.DataType;
import org.apache.flink.table.store.types.DataTypeRoot;
@@ -36,11 +35,7 @@ import org.apache.avro.SchemaBuilder;
import java.util.List;
-/**
- * Converts an Avro schema into Flink's type information. It uses {@link
RowTypeInfo} for
- * representing objects and converts Avro types into types that are compatible
with Flink's Table &
- * SQL API.
- */
+/** Converts an Avro schema into Flink's type information. */
public class AvroSchemaConverter {
private AvroSchemaConverter() {
diff --git
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index f513ee62..418a2f38 100644
---
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.store.hive;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.AbstractCatalog;
import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.catalog.Identifier;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -157,7 +157,7 @@ public class HiveCatalog extends AbstractCatalog {
.filter(
tableName ->
tableStoreTableExists(
- new ObjectPath(databaseName,
tableName), false))
+ new Identifier(databaseName,
tableName), false))
.collect(Collectors.toList());
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(databaseName, e);
@@ -167,11 +167,11 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getTableSchema(ObjectPath tablePath) throws
TableNotExistException {
- if (!tableStoreTableExists(tablePath)) {
- throw new TableNotExistException(tablePath);
+ public TableSchema getTableSchema(Identifier identifier) throws
TableNotExistException {
+ if (!tableStoreTableExists(identifier)) {
+ throw new TableNotExistException(identifier);
}
- Path tableLocation = getTableLocation(tablePath);
+ Path tableLocation = getTableLocation(identifier);
return new SchemaManager(tableLocation)
.latest()
.orElseThrow(
@@ -179,41 +179,42 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public boolean tableExists(ObjectPath tablePath) {
- return tableStoreTableExists(tablePath);
+ public boolean tableExists(Identifier identifier) {
+ return tableStoreTableExists(identifier);
}
@Override
- public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
- if (!tableStoreTableExists(tablePath)) {
+ if (!tableStoreTableExists(identifier)) {
if (ignoreIfNotExists) {
return;
} else {
- throw new TableNotExistException(tablePath);
+ throw new TableNotExistException(identifier);
}
}
try {
client.dropTable(
- tablePath.getDatabaseName(), tablePath.getObjectName(),
true, false, true);
+ identifier.getDatabaseName(), identifier.getObjectName(),
true, false, true);
} catch (TException e) {
- throw new RuntimeException("Failed to drop table " +
tablePath.getFullName(), e);
+ throw new RuntimeException("Failed to drop table " +
identifier.getFullName(), e);
}
}
@Override
- public void createTable(ObjectPath tablePath, UpdateSchema updateSchema,
boolean ignoreIfExists)
+ public void createTable(
+ Identifier identifier, UpdateSchema updateSchema, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
- String databaseName = tablePath.getDatabaseName();
+ String databaseName = identifier.getDatabaseName();
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(databaseName);
}
- if (tableExists(tablePath)) {
+ if (tableExists(identifier)) {
if (ignoreIfExists) {
return;
} else {
- throw new TableAlreadyExistException(tablePath);
+ throw new TableAlreadyExistException(identifier);
}
}
@@ -222,43 +223,43 @@ public class HiveCatalog extends AbstractCatalog {
// if changes on Hive fails there is no harm to perform the same
changes to files again
TableSchema schema;
try {
- schema = schemaManager(tablePath).commitNewVersion(updateSchema);
+ schema = schemaManager(identifier).commitNewVersion(updateSchema);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
- + tablePath.getFullName()
+ + identifier.getFullName()
+ " to underlying files",
e);
}
- Table table = newHmsTable(tablePath);
- updateHmsTable(table, tablePath, schema);
+ Table table = newHmsTable(identifier);
+ updateHmsTable(table, identifier, schema);
try {
client.createTable(table);
} catch (TException e) {
- throw new RuntimeException("Failed to create table " +
tablePath.getFullName(), e);
+ throw new RuntimeException("Failed to create table " +
identifier.getFullName(), e);
}
}
@Override
public void alterTable(
- ObjectPath tablePath, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
throws TableNotExistException {
- if (!tableStoreTableExists(tablePath)) {
+ if (!tableStoreTableExists(identifier)) {
if (ignoreIfNotExists) {
return;
} else {
- throw new TableNotExistException(tablePath);
+ throw new TableNotExistException(identifier);
}
}
try {
// first commit changes to underlying files
- TableSchema schema =
schemaManager(tablePath).commitChanges(changes);
+ TableSchema schema =
schemaManager(identifier).commitChanges(changes);
// sync to hive hms
- Table table = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
- updateHmsTable(table, tablePath, schema);
- client.alter_table(tablePath.getDatabaseName(),
tablePath.getObjectName(), table);
+ Table table = client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
+ updateHmsTable(table, identifier, schema);
+ client.alter_table(identifier.getDatabaseName(),
identifier.getObjectName(), table);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -274,16 +275,16 @@ public class HiveCatalog extends AbstractCatalog {
return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
}
- private void checkObjectPathUpperCase(ObjectPath objectPath) {
+ private void checkIdentifierUpperCase(Identifier identifier) {
checkState(
-
objectPath.getDatabaseName().equals(objectPath.getDatabaseName().toLowerCase()),
+
identifier.getDatabaseName().equals(identifier.getDatabaseName().toLowerCase()),
String.format(
"Database name[%s] cannot contain upper case",
- objectPath.getDatabaseName()));
+ identifier.getDatabaseName()));
checkState(
-
objectPath.getObjectName().equals(objectPath.getObjectName().toLowerCase()),
+
identifier.getObjectName().equals(identifier.getObjectName().toLowerCase()),
String.format(
- "Table name[%s] cannot contain upper case",
objectPath.getObjectName()));
+ "Table name[%s] cannot contain upper case",
identifier.getObjectName()));
}
private void checkFieldNamesUpperCase(List<String> fieldNames) {
@@ -303,13 +304,13 @@ public class HiveCatalog extends AbstractCatalog {
return database;
}
- private Table newHmsTable(ObjectPath tablePath) {
+ private Table newHmsTable(Identifier identifier) {
long currentTimeMillis = System.currentTimeMillis();
final TableType tableType = hiveConf.getEnum(TABLE_TYPE.key(),
TableType.MANAGED);
Table table =
new Table(
- tablePath.getObjectName(),
- tablePath.getDatabaseName(),
+ identifier.getObjectName(),
+ identifier.getDatabaseName(),
// current linux user
System.getProperty("user.name"),
(int) (currentTimeMillis / 1000),
@@ -329,19 +330,20 @@ public class HiveCatalog extends AbstractCatalog {
return table;
}
- private void updateHmsTable(Table table, ObjectPath tablePath, TableSchema
schema) {
- StorageDescriptor sd = convertToStorageDescriptor(tablePath, schema);
+ private void updateHmsTable(Table table, Identifier identifier,
TableSchema schema) {
+ StorageDescriptor sd = convertToStorageDescriptor(identifier, schema);
table.setSd(sd);
}
- private StorageDescriptor convertToStorageDescriptor(ObjectPath tablePath,
TableSchema schema) {
+ private StorageDescriptor convertToStorageDescriptor(
+ Identifier identifier, TableSchema schema) {
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(
schema.fields().stream()
.map(this::convertToFieldSchema)
.collect(Collectors.toList()));
- sd.setLocation(getTableLocation(tablePath).toString());
+ sd.setLocation(getTableLocation(identifier).toString());
sd.setInputFormat(INPUT_FORMAT_CLASS_NAME);
sd.setOutputFormat(OUTPUT_FORMAT_CLASS_NAME);
@@ -361,20 +363,20 @@ public class HiveCatalog extends AbstractCatalog {
dataField.description());
}
- private boolean tableStoreTableExists(ObjectPath tablePath) {
- return tableStoreTableExists(tablePath, true);
+ private boolean tableStoreTableExists(Identifier identifier) {
+ return tableStoreTableExists(identifier, true);
}
- private boolean tableStoreTableExists(ObjectPath tablePath, boolean
throwException) {
+ private boolean tableStoreTableExists(Identifier identifier, boolean
throwException) {
Table table;
try {
- table = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ table = client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new RuntimeException(
"Cannot determine if table "
- + tablePath.getFullName()
+ + identifier.getFullName()
+ " is a table store table.",
e);
}
@@ -384,7 +386,7 @@ public class HiveCatalog extends AbstractCatalog {
if (throwException) {
throw new IllegalArgumentException(
"Table "
- + tablePath.getFullName()
+ + identifier.getFullName()
+ " is not a table store table. It's input
format is "
+ table.getSd().getInputFormat()
+ " and its output format is "
@@ -396,19 +398,19 @@ public class HiveCatalog extends AbstractCatalog {
return true;
}
- private SchemaManager schemaManager(ObjectPath tablePath) {
- checkObjectPathUpperCase(tablePath);
- return new
SchemaManager(getTableLocation(tablePath)).withLock(lock(tablePath));
+ private SchemaManager schemaManager(Identifier identifier) {
+ checkIdentifierUpperCase(identifier);
+ return new
SchemaManager(getTableLocation(identifier)).withLock(lock(identifier));
}
- private Lock lock(ObjectPath tablePath) {
+ private Lock lock(Identifier identifier) {
if (!lockEnabled()) {
return new Lock.EmptyLock();
}
HiveCatalogLock lock =
new HiveCatalogLock(client, checkMaxSleep(hiveConf),
acquireTimeout(hiveConf));
- return Lock.fromCatalog(lock, tablePath);
+ return Lock.fromCatalog(lock, identifier);
}
static IMetaStoreClient createClient(HiveConf hiveConf, String
clientClassName) {
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 4c727f15..6fe32bd3 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.table.store.file.operation.Lock;
@@ -202,10 +201,9 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
- ObjectPath path = objectPath(ident);
return new SparkTable(
- catalog.getTable(path),
- Lock.factory(catalog.lockFactory().orElse(null), path),
+ catalog.getTable(toIdentifier(ident)),
+ Lock.factory(catalog.lockFactory().orElse(null),
toIdentifier(ident)),
conf);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
@@ -217,7 +215,7 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
List<SchemaChange> schemaChanges =
Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
try {
- catalog.alterTable(objectPath(ident), schemaChanges, false);
+ catalog.alterTable(toIdentifier(ident), schemaChanges, false);
return loadTable(ident);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
@@ -233,7 +231,7 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
catalog.createTable(
- objectPath(ident), toUpdateSchema(schema, partitions,
properties), false);
+ toIdentifier(ident), toUpdateSchema(schema, partitions,
properties), false);
return loadTable(ident);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistsException(ident);
@@ -247,7 +245,7 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
@Override
public boolean dropTable(Identifier ident) {
try {
- catalog.dropTable(objectPath(ident), false);
+ catalog.dropTable(toIdentifier(ident), false);
return true;
} catch (Catalog.TableNotExistException | NoSuchTableException e) {
return false;
@@ -341,12 +339,14 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
return namespace.length == 1;
}
- private ObjectPath objectPath(Identifier ident) throws
NoSuchTableException {
+ private org.apache.flink.table.store.file.catalog.Identifier
toIdentifier(Identifier ident)
+ throws NoSuchTableException {
if (!isValidateNamespace(ident.namespace())) {
throw new NoSuchTableException(ident);
}
- return new ObjectPath(ident.namespace()[0], ident.name());
+ return new org.apache.flink.table.store.file.catalog.Identifier(
+ ident.namespace()[0], ident.name());
}
// --------------------- unsupported methods ----------------------------