This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cdd5bb72f7 [core] Clean constants, caseSensitive, loader in Catalog
(#4721)
cdd5bb72f7 is described below
commit cdd5bb72f706901f6978a71832e4ee1c78934e08
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 16 22:33:22 2024 +0800
[core] Clean constants, caseSensitive, loader in Catalog (#4721)
---
docs/content/maintenance/configurations.md | 6 --
docs/content/program-api/flink-api.md | 2 +-
.../generated/catalog_configuration.html | 12 ++--
.../shortcodes/generated/core_configuration.html | 12 ++--
.../file_system_catalog_configuration.html | 36 -----------
.../apache/paimon/arrow/ArrowBundleRecords.java | 8 +--
.../java/org/apache/paimon/arrow/ArrowUtils.java | 10 +--
.../paimon/arrow/reader/ArrowBatchReader.java | 11 ++--
.../paimon/arrow/vector/ArrowFormatCWriter.java | 4 +-
.../paimon/arrow/vector/ArrowFormatWriter.java | 4 +-
.../org/apache/paimon/options/CatalogOptions.java | 9 ++-
.../java/org/apache/paimon/utils/StringUtils.java | 4 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 15 ++---
.../java/org/apache/paimon/catalog/Catalog.java | 74 ++++++++--------------
...ystemCatalogOptions.java => CatalogLoader.java} | 18 ++----
.../org/apache/paimon/catalog/CatalogUtils.java | 34 ++++++++++
.../org/apache/paimon/catalog/DelegateCatalog.java | 4 +-
.../apache/paimon/catalog/FileSystemCatalog.java | 6 +-
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 2 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 11 ++--
.../paimon/catalog/FileSystemCatalogTest.java | 4 +-
.../flink/action/cdc/CdcActionCommonUtils.java | 8 +--
.../apache/paimon/flink/action/cdc/Expression.java | 2 +-
.../cdc/MessageQueueSyncTableActionBase.java | 2 +-
.../flink/action/cdc/SyncDatabaseActionBase.java | 11 ++--
.../flink/action/cdc/SyncTableActionBase.java | 11 ++--
.../action/cdc/SynchronizationActionBase.java | 4 +-
.../flink/action/cdc/TableNameConverter.java | 6 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 4 +-
.../paimon/flink/sink/cdc/CaseSensitiveUtils.java | 13 ++--
.../cdc/CdcDynamicTableParsingProcessFunction.java | 5 +-
.../cdc/CdcMultiplexRecordChannelComputer.java | 5 +-
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 9 +--
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 6 +-
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 6 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 6 +-
...MultiTableUpdatedDataFieldsProcessFunction.java | 3 +-
.../paimon/flink/sink/cdc/RichCdcSinkBuilder.java | 8 +--
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 4 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 12 ++--
.../flink/action/cdc/SchemaEvolutionTest.java | 4 +-
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 4 +-
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 4 +-
.../cdc/kafka/KafkaSyncDatabaseActionITCase.java | 4 +-
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 4 +-
.../cdc/mongodb/MongoDBSyncTableActionITCase.java | 4 +-
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 5 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 5 +-
.../cdc/CdcMultiplexRecordChannelComputerTest.java | 3 +-
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 7 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 5 +-
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 5 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 3 +-
.../org/apache/paimon/flink/action/ActionBase.java | 3 +-
.../flink/compact/MultiAwareBucketTableScan.java | 4 +-
.../paimon/flink/compact/MultiTableScanBase.java | 3 +-
.../flink/compact/MultiUnawareBucketTableScan.java | 4 +-
...pendOnlyMultiTableCompactionWorkerOperator.java | 9 +--
.../flink/sink/CombinedTableCompactorSink.java | 6 +-
.../sink/MultiTablesStoreCompactOperator.java | 9 +--
.../paimon/flink/sink/StoreMultiCommitter.java | 5 +-
.../flink/sink/partition/PartitionListeners.java | 5 +-
...orter.java => PartitionStatisticsReporter.java} | 17 +++--
...sListener.java => ReportPartStatsListener.java} | 22 +++----
.../CombinedTableCompactorSourceBuilder.java | 7 +-
.../source/operator/CombinedAwareBatchSource.java | 6 +-
.../operator/CombinedAwareStreamingSource.java | 6 +-
.../source/operator/CombinedCompactorSource.java | 9 +--
.../operator/CombinedUnawareBatchSource.java | 5 +-
.../operator/CombinedUnawareStreamingSource.java | 6 +-
.../source/operator/MultiTablesReadOperator.java | 7 +-
.../operator/MultiUnawareTablesReadOperator.java | 6 +-
.../paimon/flink/sink/CompactorSinkITCase.java | 3 +-
.../paimon/flink/sink/StoreMultiCommitterTest.java | 5 +-
...t.java => PartitionStatisticsReporterTest.java} | 8 ++-
.../MultiTablesCompactorSourceBuilderITCase.java | 3 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 7 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 6 +-
.../apache/paimon/spark/SparkGenericCatalog.java | 6 +-
79 files changed, 320 insertions(+), 325 deletions(-)
diff --git a/docs/content/maintenance/configurations.md
b/docs/content/maintenance/configurations.md
index 3849d70a58..99f797e68f 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -38,12 +38,6 @@ Options for paimon catalog.
{{< generated/catalog_configuration >}}
-### FilesystemCatalogOptions
-
-Options for Filesystem catalog.
-
-{{< generated/file_system_catalog_configuration >}}
-
### HiveCatalogOptions
Options for Hive catalog.
diff --git a/docs/content/program-api/flink-api.md
b/docs/content/program-api/flink-api.md
index 6ecac3909c..3451b40d58 100644
--- a/docs/content/program-api/flink-api.md
+++ b/docs/content/program-api/flink-api.md
@@ -221,7 +221,7 @@ public class WriteCdcToTable {
Identifier identifier = Identifier.create("my_db", "T");
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
- Catalog.Loader catalogLoader =
+ CatalogLoader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalogLoader.load().getTable(identifier);
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 63f7adda1e..6355c95586 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,12 +26,6 @@ under the License.
</tr>
</thead>
<tbody>
- <tr>
- <td><h5>allow-upper-case</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>Boolean</td>
- <td>Indicates whether this catalog allow upper case, its default
value depends on the implementation of the specific catalog.</td>
- </tr>
<tr>
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
@@ -74,6 +68,12 @@ under the License.
<td>Integer</td>
<td>Controls the max number for snapshots per table in the catalog
are cached.</td>
</tr>
+ <tr>
+ <td><h5>case-sensitive</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Boolean</td>
+ <td>Indicates whether this catalog is case-sensitive.</td>
+ </tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 15b1aac935..1133de289f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -230,6 +230,12 @@ under the License.
<td>String</td>
<td>Specify the file name prefix of data files.</td>
</tr>
+ <tr>
+ <td><h5>data-file.thin-mode</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable data file thin mode to avoid duplicate columns
storage.</td>
+ </tr>
<tr>
<td><h5>delete-file.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -864,12 +870,6 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Integer</td>
<td>Default spill compression zstd level. For higher compression
rates, it can be configured to 9, but the read and write speed will
significantly decrease.</td>
</tr>
- <tr>
- <td><h5>data-file.thin-mode</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Enable data file thin mode to avoid duplicate columns
storage.</td>
- </tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
b/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
deleted file mode 100644
index c416ed6da5..0000000000
--- a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
+++ /dev/null
@@ -1,36 +0,0 @@
-{{/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/}}
-<table class="configuration table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 20%">Key</th>
- <th class="text-left" style="width: 15%">Default</th>
- <th class="text-left" style="width: 10%">Type</th>
- <th class="text-left" style="width: 55%">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><h5>case-sensitive</h5></td>
- <td style="word-wrap: break-word;">true</td>
- <td>Boolean</td>
- <td>Is case sensitive. If case insensitive, you need to set this
option to false, and the table name and fields be converted to lowercase.</td>
- </tr>
- </tbody>
-</table>
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
index 9627bbd85d..25f6603ec2 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
@@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {
private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
- private final boolean allowUpperCase;
+ private final boolean caseSensitive;
public ArrowBundleRecords(
- VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean
allowUpperCase) {
+ VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean
caseSensitive) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
- this.allowUpperCase = allowUpperCase;
+ this.caseSensitive = caseSensitive;
}
public VectorSchemaRoot getVectorSchemaRoot() {
@@ -52,7 +52,7 @@ public class ArrowBundleRecords implements BundleRecords {
@Override
public Iterator<InternalRow> iterator() {
- ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType,
allowUpperCase);
+ ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType,
caseSensitive);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index b3925a0a76..0f6a98b7a2 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -55,6 +55,8 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
/** Utilities for creating Arrow objects. */
public class ArrowUtils {
@@ -66,13 +68,13 @@ public class ArrowUtils {
}
public static VectorSchemaRoot createVectorSchemaRoot(
- RowType rowType, BufferAllocator allocator, boolean
allowUpperCase) {
+ RowType rowType, BufferAllocator allocator, boolean caseSensitive)
{
List<Field> fields =
rowType.getFields().stream()
.map(
f ->
toArrowField(
- allowUpperCase ? f.name() :
f.name().toLowerCase(),
+ toLowerCaseIfNeed(f.name(),
caseSensitive),
f.id(),
f.type(),
0))
@@ -81,9 +83,9 @@ public class ArrowUtils {
}
public static FieldVector createVector(
- DataField dataField, BufferAllocator allocator, boolean
allowUpperCase) {
+ DataField dataField, BufferAllocator allocator, boolean
caseSensitive) {
return toArrowField(
- allowUpperCase ? dataField.name() :
dataField.name().toLowerCase(),
+ toLowerCaseIfNeed(dataField.name(), caseSensitive),
dataField.id(),
dataField.type(),
0)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
index 9d20062b43..b626758ded 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
@@ -34,6 +34,8 @@ import org.apache.arrow.vector.types.pojo.Schema;
import java.util.Iterator;
import java.util.List;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
/** Reader from a {@link VectorSchemaRoot} to paimon rows. */
public class ArrowBatchReader {
@@ -41,9 +43,9 @@ public class ArrowBatchReader {
private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;
- private final boolean allowUpperCase;
+ private final boolean caseSensitive;
- public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
+ public ArrowBatchReader(RowType rowType, boolean caseSensitive) {
this.internalRowSerializer = new InternalRowSerializer(rowType);
ColumnVector[] columnVectors = new
ColumnVector[rowType.getFieldCount()];
this.convertors = new
Arrow2PaimonVectorConverter[rowType.getFieldCount()];
@@ -53,7 +55,7 @@ public class ArrowBatchReader {
for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] =
Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
- this.allowUpperCase = allowUpperCase;
+ this.caseSensitive = caseSensitive;
}
public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
@@ -63,8 +65,7 @@ public class ArrowBatchReader {
for (int i = 0; i < dataFields.size(); ++i) {
try {
String fieldName = dataFields.get(i).name();
- Field field =
- arrowSchema.findField(allowUpperCase ? fieldName :
fieldName.toLowerCase());
+ Field field =
arrowSchema.findField(toLowerCaseIfNeed(fieldName, caseSensitive));
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 10afcaf691..442457813a 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -37,8 +37,8 @@ public class ArrowFormatCWriter implements AutoCloseable {
private final ArrowSchema schema;
private final ArrowFormatWriter realWriter;
- public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean
allowUpperCase) {
- this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize,
allowUpperCase);
+ public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
+ this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize,
caseSensitive);
RootAllocator allocator = realWriter.getAllocator();
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index acdb5d0dcb..9f55792197 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -43,10 +43,10 @@ public class ArrowFormatWriter implements AutoCloseable {
private final RootAllocator allocator;
private int rowId;
- public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean
allowUpperCase) {
+ public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
allocator = new RootAllocator();
- vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType,
allocator, allowUpperCase);
+ vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType,
allocator, caseSensitive);
fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index bb8cfae682..b22274e011 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -128,13 +128,12 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table
in the catalog are cached.");
- public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
- ConfigOptions.key("allow-upper-case")
+ public static final ConfigOption<Boolean> CASE_SENSITIVE =
+ ConfigOptions.key("case-sensitive")
.booleanType()
.noDefaultValue()
- .withDescription(
- "Indicates whether this catalog allow upper case, "
- + "its default value depends on the
implementation of the specific catalog.");
+ .withFallbackKeys("allow-upper-case")
+ .withDescription("Indicates whether this catalog is
case-sensitive.");
public static final ConfigOption<Boolean> SYNC_ALL_PROPERTIES =
ConfigOptions.key("sync-all-properties")
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index e184624c0b..c4e07e0a69 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -542,8 +542,8 @@ public class StringUtils {
return "`" + str + "`";
}
- public static String caseSensitiveConversion(String str, boolean
allowUpperCase) {
- return allowUpperCase ? str : str.toLowerCase();
+ public static String toLowerCaseIfNeed(String str, boolean caseSensitive) {
+ return caseSensitive ? str : str.toLowerCase();
}
public static boolean isNumeric(final CharSequence cs) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a1b41e3b8a..db69092955 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -60,7 +60,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -82,7 +81,7 @@ public abstract class AbstractCatalog implements Catalog {
protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
- this.tableDefaultOptions =
Catalog.tableDefaultOptions(options.toMap());
+ this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}
@@ -123,11 +122,6 @@ public abstract class AbstractCatalog implements Catalog {
return
catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}
- @Override
- public boolean allowUpperCase() {
- return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
- }
-
protected boolean allowCustomTablePath() {
return false;
}
@@ -559,8 +553,9 @@ public abstract class AbstractCatalog implements Catalog {
}
protected void validateIdentifierNameCaseInsensitive(Identifier
identifier) {
- Catalog.validateCaseInsensitive(allowUpperCase(), "Database",
identifier.getDatabaseName());
- Catalog.validateCaseInsensitive(allowUpperCase(), "Table",
identifier.getObjectName());
+ CatalogUtils.validateCaseInsensitive(
+ caseSensitive(), "Database", identifier.getDatabaseName());
+ CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table",
identifier.getObjectName());
}
private void
validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
@@ -578,7 +573,7 @@ public abstract class AbstractCatalog implements Catalog {
}
protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
- Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
+ CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field",
fieldNames);
}
private void validateAutoCreateClose(Map<String, String> options) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c3808caa13..7b1fe0ea07 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,15 +26,9 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.view.View;
-import java.io.Serializable;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-
-import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* This interface is responsible for reading and writing metadata such as
database/table from a
@@ -46,30 +40,38 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
@Public
public interface Catalog extends AutoCloseable {
- String DEFAULT_DATABASE = "default";
-
+ // constants for system table and database
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";
- String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
- String DB_SUFFIX = ".db";
+ // constants for table and database
String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";
+
+ // constants for database
+ String DEFAULT_DATABASE = "default";
+ String DB_SUFFIX = ".db";
String DB_LOCATION_PROP = "location";
+
+ // constants for table
+ String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";
- String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime";
- /** Warehouse root path containing all database directories in this
catalog. */
+ /** Warehouse root path for creating new databases. */
String warehouse();
- /** Catalog options. */
+ /** {@link FileIO} of this catalog. It can access {@link #warehouse()}
path. */
+ FileIO fileIO();
+
+ /** Catalog options for re-creating this catalog. */
Map<String, String> options();
- FileIO fileIO();
+ /** Return a boolean that indicates whether this catalog is
case-sensitive. */
+ boolean caseSensitive();
/**
* Get the names of all databases in this catalog.
@@ -325,44 +327,30 @@ public interface Catalog extends AutoCloseable {
throw new UnsupportedOperationException();
}
- /** Return a boolean that indicates whether this catalog allow upper case.
*/
- boolean allowUpperCase();
-
+ /**
+ * Repair the entire Catalog, repair the metadata in the metastore
consistent with the metadata
+ * in the filesystem, register missing tables in the metastore.
+ */
default void repairCatalog() {
throw new UnsupportedOperationException();
}
+ /**
+ * Repair the entire database, repair the metadata in the metastore
consistent with the metadata
+ * in the filesystem, register missing tables in the metastore.
+ */
default void repairDatabase(String databaseName) {
throw new UnsupportedOperationException();
}
+ /**
+ * Repair the table, repair the metadata in the metastore consistent with
the metadata in the
+ * filesystem.
+ */
default void repairTable(Identifier identifier) throws
TableNotExistException {
throw new UnsupportedOperationException();
}
- static Map<String, String> tableDefaultOptions(Map<String, String>
options) {
- return convertToPropertiesPrefixKey(options,
TABLE_DEFAULT_OPTION_PREFIX);
- }
-
- /** Validate database, table and field names must be lowercase when not
case-sensitive. */
- static void validateCaseInsensitive(boolean caseSensitive, String type,
String... names) {
- validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
- }
-
- /** Validate database, table and field names must be lowercase when not
case-sensitive. */
- static void validateCaseInsensitive(boolean caseSensitive, String type,
List<String> names) {
- if (caseSensitive) {
- return;
- }
- List<String> illegalNames =
- names.stream().filter(f ->
!f.equals(f.toLowerCase())).collect(Collectors.toList());
- checkArgument(
- illegalNames.isEmpty(),
- String.format(
- "%s name %s cannot contain upper case in the catalog.",
- type, illegalNames));
- }
-
/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
@@ -599,10 +587,4 @@ public interface Catalog extends AutoCloseable {
return identifier;
}
}
-
- /** Loader of {@link Catalog}. */
- @FunctionalInterface
- interface Loader extends Serializable {
- Catalog load();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
similarity index 55%
rename from
paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
rename to paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
index e656742b42..c8de08139c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
@@ -18,19 +18,11 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+import java.io.Serializable;
-/** Options for filesystem catalog. */
-public final class FileSystemCatalogOptions {
+/** Loader for creating a {@link Catalog}. */
+@FunctionalInterface
+public interface CatalogLoader extends Serializable {
- public static final ConfigOption<Boolean> CASE_SENSITIVE =
- ConfigOptions.key("case-sensitive")
- .booleanType()
- .defaultValue(true)
- .withFallbackKeys("allow-upper-case")
- .withDescription(
- "Is case sensitive. If case insensitive, you need
to set this option to false, and the table name and fields be converted to
lowercase.");
-
- private FileSystemCatalogOptions() {}
+ Catalog load();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 39f81833a9..bae23c6276 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -21,6 +21,15 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
+import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Utils for {@link Catalog}. */
public class CatalogUtils {
@@ -51,4 +60,29 @@ public class CatalogUtils {
public static String table(String path) {
return SchemaManager.identifierFromPath(path, false).getObjectName();
}
+
+ public static Map<String, String> tableDefaultOptions(Map<String, String>
options) {
+ return convertToPropertiesPrefixKey(options,
TABLE_DEFAULT_OPTION_PREFIX);
+ }
+
+ /** Validate database, table and field names must be lowercase when not
case-sensitive. */
+ public static void validateCaseInsensitive(
+ boolean caseSensitive, String type, String... names) {
+ validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
+ }
+
+ /** Validate database, table and field names must be lowercase when not
case-sensitive. */
+ public static void validateCaseInsensitive(
+ boolean caseSensitive, String type, List<String> names) {
+ if (caseSensitive) {
+ return;
+ }
+ List<String> illegalNames =
+ names.stream().filter(f ->
!f.equals(f.toLowerCase())).collect(Collectors.toList());
+ checkArgument(
+ illegalNames.isEmpty(),
+ String.format(
+ "%s name %s cannot contain upper case in the catalog.",
+ type, illegalNames));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 2298626b0e..93e8ce2581 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -42,8 +42,8 @@ public class DelegateCatalog implements Catalog {
}
@Override
- public boolean allowUpperCase() {
- return wrapped.allowUpperCase();
+ public boolean caseSensitive() {
+ return wrapped.caseSensitive();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 9264a54647..279ddb26ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -34,7 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import static
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {
@@ -158,7 +158,7 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public boolean allowUpperCase() {
- return catalogOptions.get(CASE_SENSITIVE);
+ public boolean caseSensitive() {
+ return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 778bc591fe..551b2d8fc9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -320,7 +320,7 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- public boolean allowUpperCase() {
+ public boolean caseSensitive() {
return false;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 86b87e25e8..c30e1109e2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -52,6 +52,7 @@ import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
/** A catalog implementation for REST. */
@@ -61,7 +62,7 @@ public class RESTCatalog implements Catalog {
private final RESTClient client;
private final ResourcePaths resourcePaths;
- private final Map<String, String> options;
+ private final Options options;
private final Map<String, String> baseHeader;
private final AuthSession catalogAuth;
@@ -99,7 +100,7 @@ public class RESTCatalog implements Catalog {
}
Map<String, String> initHeaders =
RESTUtil.merge(configHeaders(options.toMap()),
this.catalogAuth.getHeaders());
- this.options = fetchOptionsFromServer(initHeaders, options.toMap());
+ this.options = new Options(fetchOptionsFromServer(initHeaders,
options.toMap()));
this.resourcePaths =
ResourcePaths.forCatalogProperties(
this.options.get(RESTCatalogInternalOptions.PREFIX));
@@ -112,7 +113,7 @@ public class RESTCatalog implements Catalog {
@Override
public Map<String, String> options() {
- return this.options;
+ return this.options.toMap();
}
@Override
@@ -223,8 +224,8 @@ public class RESTCatalog implements Catalog {
}
@Override
- public boolean allowUpperCase() {
- return false;
+ public boolean caseSensitive() {
+ return options.getOptional(CASE_SENSITIVE).orElse(true);
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
index 303a9d8733..65ea6721c2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -36,12 +36,12 @@ public class FileSystemCatalogTest extends CatalogTestBase {
public void setUp() throws Exception {
super.setUp();
Options catalogOptions = new Options();
- catalogOptions.set(CatalogOptions.ALLOW_UPPER_CASE, false);
+ catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false);
catalog = new FileSystemCatalog(fileIO, new Path(warehouse),
catalogOptions);
}
@Test
- public void testCreateTableAllowUpperCase() throws Exception {
+ public void testCreateTableCaseSensitive() throws Exception {
catalog.createDatabase("test_db", false);
Identifier identifier = Identifier.create("test_db", "new_table");
Schema schema =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index c8af6f91c4..6482a625f4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -42,7 +42,7 @@ import static
org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
-import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
/** Common utils for CDC Action. */
public class CdcActionCommonUtils {
@@ -129,21 +129,21 @@ public class CdcActionCommonUtils {
List<String> allFieldNames = new ArrayList<>();
for (DataField field : sourceSchema.fields()) {
- String fieldName = caseSensitiveConversion(field.name(),
caseSensitive);
+ String fieldName = toLowerCaseIfNeed(field.name(), caseSensitive);
allFieldNames.add(fieldName);
builder.column(fieldName, field.type(), field.description());
}
for (ComputedColumn computedColumn : computedColumns) {
String computedColumnName =
- caseSensitiveConversion(computedColumn.columnName(),
caseSensitive);
+ toLowerCaseIfNeed(computedColumn.columnName(),
caseSensitive);
allFieldNames.add(computedColumnName);
builder.column(computedColumnName, computedColumn.columnType());
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
String metadataColumnName =
- caseSensitiveConversion(metadataConverter.columnName(),
caseSensitive);
+ toLowerCaseIfNeed(metadataConverter.columnName(),
caseSensitive);
allFieldNames.add(metadataColumnName);
builder.column(metadataColumnName, metadataConverter.dataType());
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 2e0a131929..3290ec1829 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -189,7 +189,7 @@ public interface Expression extends Serializable {
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
String referencedFieldCheckForm =
- StringUtils.caseSensitiveConversion(referencedField,
caseSensitive);
+ StringUtils.toLowerCaseIfNeed(referencedField,
caseSensitive);
DataType fieldType =
checkNotNull(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 9c629b5a51..3af0957ce6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -86,7 +86,7 @@ public abstract class MessageQueueSyncTableActionBase extends
SyncTableActionBas
tableConfig,
retrievedSchema,
metadataConverters,
- allowUpperCase,
+ caseSensitive,
true,
false);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4fb1339c51..56334c1e7b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -155,9 +156,9 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
@Override
protected void validateCaseSensitivity() {
- Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
- Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix",
tablePrefix);
- Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix",
tableSuffix);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Database",
database);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Table prefix",
tablePrefix);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Table suffix",
tableSuffix);
}
@Override
@@ -179,7 +180,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
NewTableSchemaBuilder schemaBuilder =
new NewTableSchemaBuilder(
tableConfig,
- allowUpperCase,
+ caseSensitive,
partitionKeys,
primaryKeys,
requirePrimaryKeys(),
@@ -190,7 +191,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
excludingTables == null ? null :
Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(
- allowUpperCase,
+ caseSensitive,
mergeShards,
dbPrefix,
dbSuffix,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 87efeb2a19..6fcdbd44bc 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
@@ -107,15 +108,15 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
tableConfig,
retrievedSchema,
metadataConverters,
- allowUpperCase,
+ caseSensitive,
true,
true);
}
@Override
protected void validateCaseSensitivity() {
- Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
- Catalog.validateCaseInsensitive(allowUpperCase, "Table", table);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Database",
database);
+ CatalogUtils.validateCaseInsensitive(caseSensitive, "Table", table);
}
@Override
@@ -142,7 +143,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
buildComputedColumns(
computedColumnArgs,
fileStoreTable.schema().fields(),
- allowUpperCase);
+ caseSensitive);
// check partition keys and primary keys in case that user
specified them
checkConstraints();
}
@@ -162,7 +163,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
@Override
protected EventParser.Factory<RichCdcMultiplexRecord>
buildEventParserFactory() {
- boolean caseSensitive = this.allowUpperCase;
+ boolean caseSensitive = this.caseSensitive;
return () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index a7c7703474..d755b200a9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -64,7 +64,7 @@ public abstract class SynchronizationActionBase extends
ActionBase {
protected final String database;
protected final Configuration cdcSourceConfig;
protected final SyncJobHandler syncJobHandler;
- protected final boolean allowUpperCase;
+ protected final boolean caseSensitive;
protected Map<String, String> tableConfig = new HashMap<>();
protected TypeMapping typeMapping = TypeMapping.defaultMapping();
@@ -80,7 +80,7 @@ public abstract class SynchronizationActionBase extends
ActionBase {
this.database = database;
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
- this.allowUpperCase = catalog.allowUpperCase();
+ this.caseSensitive = catalog.caseSensitive();
this.syncJobHandler.registerJdbcDriver();
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 15fc3507ce..7dd63ed227 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -24,6 +24,8 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
/** Used to convert a MySQL source table name to corresponding Paimon table
name. */
public class TableNameConverter implements Serializable {
@@ -78,7 +80,7 @@ public class TableNameConverter implements Serializable {
// top priority: table mapping
if (tableMapping.containsKey(originTblName.toLowerCase())) {
String mappedName = tableMapping.get(originTblName.toLowerCase());
- return caseSensitive ? mappedName : mappedName.toLowerCase();
+ return toLowerCaseIfNeed(mappedName, caseSensitive);
}
String tblPrefix = prefix;
@@ -93,7 +95,7 @@ public class TableNameConverter implements Serializable {
}
// third priority: normal prefix and suffix
- String tableName = caseSensitive ? originTblName :
originTblName.toLowerCase();
+ String tableName = toLowerCaseIfNeed(originTblName, caseSensitive);
return tblPrefix + tableName + tblSuffix;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index ce2e9124a6..0f452e2834 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -139,7 +139,7 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
TableNameConverter tableNameConverter =
new TableNameConverter(
- allowUpperCase, mergeShards, tablePrefix, tableSuffix,
tableMapping);
+ caseSensitive, mergeShards, tablePrefix, tableSuffix,
tableMapping);
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
@@ -155,7 +155,7 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
tableConfig,
tableInfo.schema(),
metadataConverters,
- allowUpperCase,
+ caseSensitive,
false,
true);
try {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
index 4892aee030..e80692ed22 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -28,8 +29,8 @@ import org.apache.flink.util.Collector;
public class CaseSensitiveUtils {
public static DataStream<CdcRecord> cdcRecordConvert(
- Catalog.Loader catalogLoader, DataStream<CdcRecord> input) {
- if (allowUpperCase(catalogLoader)) {
+ CatalogLoader catalogLoader, DataStream<CdcRecord> input) {
+ if (caseSensitive(catalogLoader)) {
return input;
}
@@ -46,8 +47,8 @@ public class CaseSensitiveUtils {
}
public static DataStream<CdcMultiplexRecord> cdcMultiplexRecordConvert(
- Catalog.Loader catalogLoader, DataStream<CdcMultiplexRecord>
input) {
- if (allowUpperCase(catalogLoader)) {
+ CatalogLoader catalogLoader, DataStream<CdcMultiplexRecord> input)
{
+ if (caseSensitive(catalogLoader)) {
return input;
}
@@ -65,9 +66,9 @@ public class CaseSensitiveUtils {
.name("Case-insensitive Convert");
}
- private static boolean allowUpperCase(Catalog.Loader catalogLoader) {
+ private static boolean caseSensitive(CatalogLoader catalogLoader) {
try (Catalog catalog = catalogLoader.load()) {
- return catalog.allowUpperCase();
+ return catalog.caseSensitive();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 886e33e204..4efcf1207e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.types.DataField;
@@ -62,13 +63,13 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
private final EventParser.Factory<T> parserFactory;
private final String database;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private transient EventParser<T> parser;
private transient Catalog catalog;
public CdcDynamicTableParsingProcessFunction(
- String database, Catalog.Loader catalogLoader,
EventParser.Factory<T> parserFactory) {
+ String database, CatalogLoader catalogLoader,
EventParser.Factory<T> parserFactory) {
// for now, only support single database
this.database = database;
this.catalogLoader = catalogLoader;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index fdad3a921d..2858b2d4eb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +39,13 @@ public class CdcMultiplexRecordChannelComputer implements
ChannelComputer<CdcMul
LoggerFactory.getLogger(CdcMultiplexRecordChannelComputer.class);
private static final long serialVersionUID = 1L;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private transient int numChannels;
private Map<Identifier, CdcRecordChannelComputer> channelComputers;
- public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
+ public CdcMultiplexRecordChannelComputer(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 5db111a300..9387a82938 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.MultiTableCommittable;
@@ -67,7 +68,7 @@ public class CdcRecordStoreMultiWriteOperator
private final StoreSinkWrite.WithWriteBufferProvider
storeSinkWriteProvider;
private final String initialCommitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private MemoryPoolFactory memoryPoolFactory;
private Catalog catalog;
@@ -79,7 +80,7 @@ public class CdcRecordStoreMultiWriteOperator
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
@@ -264,10 +265,10 @@ public class CdcRecordStoreMultiWriteOperator
extends PrepareCommitOperator.Factory<CdcMultiplexRecord,
MultiTableCommittable> {
private final StoreSinkWrite.WithWriteBufferProvider
storeSinkWriteProvider;
private final String initialCommitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
public Factory(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 28b68fedc3..5c27db6ddf 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
@@ -48,7 +48,7 @@ public class CdcSinkBuilder<T> {
private EventParser.Factory<T> parserFactory = null;
private Table table = null;
private Identifier identifier = null;
- private Catalog.Loader catalogLoader = null;
+ private CatalogLoader catalogLoader = null;
@Nullable private Integer parallelism;
@@ -77,7 +77,7 @@ public class CdcSinkBuilder<T> {
return this;
}
- public CdcSinkBuilder<T> withCatalogLoader(Catalog.Loader catalogLoader) {
+ public CdcSinkBuilder<T> withCatalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 1688d4deb0..4cd9235cb5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
@@ -60,13 +60,13 @@ public class FlinkCdcMultiTableSink implements Serializable
{
private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global
Committer";
private final boolean isOverwrite = false;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final String commitUser;
public FlinkCdcMultiTableSink(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
String commitUser) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index a9ad66847b..bd18c7e7ad 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -72,7 +72,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
// it will check newly added tables and create the corresponding
// Paimon tables. 2) in multiplex sink where it is used to
// initialize different writers to multiple tables.
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
@@ -111,7 +111,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
return this;
}
- public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader
catalogLoader) {
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(CatalogLoader
catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 0ad412e47d..dd612a52c2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -51,7 +52,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
private final Map<Identifier, SchemaManager> schemaManagers = new
HashMap<>();
- public MultiTableUpdatedDataFieldsProcessFunction(Catalog.Loader
catalogLoader) {
+ public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader
catalogLoader) {
super(catalogLoader);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 610856d3af..43f63854bb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
@@ -39,7 +39,7 @@ public class RichCdcSinkBuilder {
private DataStream<RichCdcRecord> input = null;
private Table table = null;
private Identifier identifier = null;
- private Catalog.Loader catalogLoader = null;
+ private CatalogLoader catalogLoader = null;
@Nullable private Integer parallelism;
@@ -62,7 +62,7 @@ public class RichCdcSinkBuilder {
return this;
}
- public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
+ public RichCdcSinkBuilder catalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
@@ -114,7 +114,7 @@ public class RichCdcSinkBuilder {
/** @deprecated Use {@link #catalogLoader}. */
@Deprecated
- public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ public RichCdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 64f00d96b0..504f631058 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -53,7 +53,7 @@ public class UpdatedDataFieldsProcessFunction
private Set<FieldIdentifier> latestFields;
public UpdatedDataFieldsProcessFunction(
- SchemaManager schemaManager, Identifier identifier, Catalog.Loader
catalogLoader) {
+ SchemaManager schemaManager, Identifier identifier, CatalogLoader
catalogLoader) {
super(catalogLoader);
this.schemaManager = schemaManager;
this.identifier = identifier;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 4f02b784c2..d50df23742 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -48,9 +49,9 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I,
O> extends Process
private static final Logger LOG =
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
- protected final Catalog.Loader catalogLoader;
+ protected final CatalogLoader catalogLoader;
protected Catalog catalog;
- private boolean allowUpperCase;
+ private boolean caseSensitive;
private static final List<DataTypeRoot> STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -70,7 +71,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I,
O> extends Process
private static final List<DataTypeRoot> TIMESTAMP_TYPES =
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
- protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader
catalogLoader) {
+ protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader
catalogLoader) {
this.catalogLoader = catalogLoader;
}
@@ -86,7 +87,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I,
O> extends Process
*/
public void open(Configuration parameters) {
this.catalog = catalogLoader.load();
- this.allowUpperCase = this.catalog.allowUpperCase();
+ this.caseSensitive = this.catalog.caseSensitive();
}
protected void applySchemaChange(
@@ -215,8 +216,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
List<SchemaChange> result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
- String newFieldName =
- StringUtils.caseSensitiveConversion(newField.name(),
allowUpperCase);
+ String newFieldName =
StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
if (oldFields.containsKey(newFieldName)) {
DataField oldField = oldFields.get(newFieldName);
// we compare by ignoring nullable, because partition keys and
primary keys might be
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
index 9ba1837686..46c8e98fb6 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
@@ -202,7 +202,7 @@ public class SchemaEvolutionTest extends TableTestBase {
DataStream<List<DataField>> upDataFieldStream =
env.fromCollection(prepareData());
Options options = new Options();
options.set("warehouse", tempPath.toString());
- final Catalog.Loader catalogLoader = () ->
FlinkCatalogFactory.createPaimonCatalog(options);
+ final CatalogLoader catalogLoader = () ->
FlinkCatalogFactory.createPaimonCatalog(options);
Identifier identifier = Identifier.create(database, tableName);
DataStream<Void> schemaChangeProcessFunction =
upDataFieldStream
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 0e1f4e72ea..6e37c589ac 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -535,7 +535,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 8a4dc2f303..ed1885f5d7 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -1121,7 +1121,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.withComputedColumnArgs("_YEAR=year(_DATE)")
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
index de189bc205..606c46e90e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -475,7 +475,7 @@ public class KafkaSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index ae0b0b412a..92c2a7243a 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -196,7 +196,7 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index b4f31f2d6d..2d8489dc23 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -382,7 +382,7 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.withComputedColumnArgs("_YEAR=year(_DATE)")
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index b48b898d66..10ee548125 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.options.CatalogOptions;
@@ -475,7 +474,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
syncDatabaseActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.withTableConfig(getBasicTableConfig())
.build();
runActionWithDefaultEnv(action);
@@ -496,7 +495,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
syncDatabaseActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.withMode(COMBINED.configString())
.withTableConfig(getBasicTableConfig())
.build();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index febbe4e1de..749d87eb06 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -1327,7 +1326,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
syncTableActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)")
.build();
runActionWithDefaultEnv(action);
@@ -1363,7 +1362,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
syncTableActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
-
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ CatalogOptions.CASE_SENSITIVE.key(),
"false"))
.withPrimaryKeys("ID1", "PART")
.withPartitionKeys("PART")
.build();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 867cbdbae0..43b7d2ba63 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
@@ -54,7 +55,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class CdcMultiplexRecordChannelComputerTest {
@TempDir java.nio.file.Path tempDir;
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
private Path warehouse;
private String databaseName;
private Identifier tableWithPartition;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 9f35b25026..4436aa392d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
@@ -82,7 +83,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
private Identifier firstTable;
private Catalog catalog;
private Identifier secondTable;
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
private Schema firstTableSchema;
@BeforeEach
@@ -340,7 +341,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
harness.close();
}
- private Catalog.Loader createCatalogLoader() {
+ private CatalogLoader createCatalogLoader() {
Options catalogOptions = createCatalogOptions(warehouse);
return () ->
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
}
@@ -688,7 +689,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
}
private OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable>
- createTestHarness(Catalog.Loader catalogLoader) throws Exception {
+ createTestHarness(CatalogLoader catalogLoader) throws Exception {
CdcRecordStoreMultiWriteOperator.Factory operatorFactory =
new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader,
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 28b137a93e..35286e3a88 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
@@ -162,8 +162,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
Options catalogOptions = new Options();
catalogOptions.set("warehouse", tempDir.toString());
- Catalog.Loader catalogLoader =
- () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ CatalogLoader catalogLoader = () ->
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
new FlinkCdcSyncDatabaseSinkBuilder<TestCdcEvent>()
.withInput(source)
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 8b19391f3e..9fccaac992 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
@@ -159,8 +159,7 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
Options catalogOptions = new Options();
catalogOptions.set("warehouse", tempDir.toString());
- Catalog.Loader catalogLoader =
- () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ CatalogLoader catalogLoader = () ->
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
new CdcSinkBuilder<TestCdcEvent>()
.withInput(source)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index dd95c48af8..3407735b4b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
@@ -519,7 +520,7 @@ public class FlinkCatalog extends AbstractCatalog {
// Although catalog.createTable will copy the default options, but
we need this info
// here before create table, such as
table-default.kafka.bootstrap.servers defined in
// catalog options. Temporarily, we copy the default options here.
-
Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
+
CatalogUtils.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
options.put(REGISTER_TIMEOUT.key(),
logStoreAutoRegisterTimeout.toString());
registerLogSystem(catalog, identifier, options, classLoader);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 30e32d62ef..4490023e7b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
@@ -99,7 +100,7 @@ public abstract class ActionBase implements Action {
env.execute(name);
}
- protected Catalog.Loader catalogLoader() {
+ protected CatalogLoader catalogLoader() {
// to make the action workflow serializable
Options catalogOptions = this.catalogOptions;
return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
index 88730132ef..e2fd5a9d31 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.compact;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -47,7 +47,7 @@ public class MultiAwareBucketTableScan extends
MultiTableScanBase<Tuple2<Split,
protected transient Map<Identifier, StreamTableScan> scansMap;
public MultiAwareBucketTableScan(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
index f5940740b6..805e8da0a4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.compact;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -59,7 +60,7 @@ public abstract class MultiTableScanBase<T> implements
AutoCloseable {
protected boolean isStreaming;
public MultiTableScanBase(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
index da86b93af5..2ad2642b62 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.compact;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -41,7 +41,7 @@ public class MultiUnawareBucketTableScan
protected transient Map<Identifier,
UnawareAppendTableCompactionCoordinator> tablesMap;
public MultiUnawareBucketTableScan(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 83d51f302e..07ec7d165e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.compact.UnawareBucketCompactor;
import org.apache.paimon.options.Options;
@@ -56,7 +57,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class);
private final String commitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
// support multi table compaction
private transient Map<Identifier, UnawareBucketCompactor>
compactorContainer;
@@ -67,7 +68,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
private AppendOnlyMultiTableCompactionWorkerOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
String commitUser,
Options options) {
super(parameters, options);
@@ -188,9 +189,9 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
MultiTableUnawareAppendCompactionTask,
MultiTableCommittable> {
private final String commitUser;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
- public Factory(Catalog.Loader catalogLoader, String commitUser,
Options options) {
+ public Factory(CatalogLoader catalogLoader, String commitUser, Options
options) {
super(options);
this.commitUser = commitUser;
this.catalogLoader = catalogLoader;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 25f76ce976..53f1bf165d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
@@ -55,14 +55,14 @@ public class CombinedTableCompactorSink implements
Serializable {
private static final String WRITER_NAME = "Writer";
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final boolean ignorePreviousFiles;
private final boolean fullCompaction;
private final Options options;
public CombinedTableCompactorSink(
- Catalog.Loader catalogLoader, Options options, boolean
fullCompaction) {
+ CatalogLoader catalogLoader, Options options, boolean
fullCompaction) {
this.catalogLoader = catalogLoader;
this.ignorePreviousFiles = false;
this.fullCompaction = fullCompaction;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 58f6a38340..02a7e6c1b3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
@@ -72,7 +73,7 @@ public class MultiTablesStoreCompactOperator
private transient StoreSinkWriteState state;
private transient DataFileMetaSerializer dataFileMetaSerializer;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
protected Catalog catalog;
protected Map<Identifier, FileStoreTable> tables;
@@ -81,7 +82,7 @@ public class MultiTablesStoreCompactOperator
private MultiTablesStoreCompactOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
String initialCommitUser,
CheckpointConfig checkpointConfig,
boolean isStreaming,
@@ -324,7 +325,7 @@ public class MultiTablesStoreCompactOperator
/** {@link StreamOperatorFactory} of {@link
MultiTablesStoreCompactOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<RowData,
MultiTableCommittable> {
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final CheckpointConfig checkpointConfig;
private final boolean isStreaming;
private final boolean ignorePreviousFiles;
@@ -332,7 +333,7 @@ public class MultiTablesStoreCompactOperator
private final String initialCommitUser;
public Factory(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
String initialCommitUser,
CheckpointConfig checkpointConfig,
boolean isStreaming,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 537a98f97f..01acddb9ad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.WrappedManifestCommittable;
@@ -56,12 +57,12 @@ public class StoreMultiCommitter
private final boolean ignoreEmptyCommit;
private final Map<String, String> dynamicOptions;
- public StoreMultiCommitter(Catalog.Loader catalogLoader, Context context) {
+ public StoreMultiCommitter(CatalogLoader catalogLoader, Context context) {
this(catalogLoader, context, false, Collections.emptyMap());
}
public StoreMultiCommitter(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Context context,
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index dbdf776014..d190b9ccf3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -58,8 +58,11 @@ public class PartitionListeners implements Closeable {
throws Exception {
List<PartitionListener> listeners = new ArrayList<>();
- ReportHmsListener.create(context.isRestored(), context.stateStore(),
table)
+ // partition statistics reporter
+ ReportPartStatsListener.create(context.isRestored(),
context.stateStore(), table)
.ifPresent(listeners::add);
+
+ // partition mark done
PartitionMarkDone.create(
context.streamingCheckpointEnabled(),
context.isRestored(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
similarity index 85%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
index 853dc52c20..b75889d567 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
@@ -40,22 +40,24 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.catalog.Catalog.HIVE_LAST_UPDATE_TIME_PROP;
+import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
/** Action to report the table statistic from the latest snapshot to HMS. */
-public class HmsReporter implements Closeable {
+public class PartitionStatisticsReporter implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(HmsReporter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionStatisticsReporter.class);
+
+ private static final String HIVE_LAST_UPDATE_TIME_PROP =
"transient_lastDdlTime";
private final MetastoreClient metastoreClient;
private final SnapshotReader snapshotReader;
private final SnapshotManager snapshotManager;
- public HmsReporter(FileStoreTable table, MetastoreClient client) {
+ public PartitionStatisticsReporter(FileStoreTable table, MetastoreClient
client) {
this.metastoreClient =
Preconditions.checkNotNull(client, "the metastore client
factory is null");
this.snapshotReader = table.newSnapshotReader();
@@ -90,7 +92,12 @@ public class HmsReporter implements Closeable {
statistic.put(NUM_FILES_PROP, String.valueOf(fileCount));
statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize));
statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount));
- statistic.put(HIVE_LAST_UPDATE_TIME_PROP,
String.valueOf(modifyTime / 1000));
+
+ String modifyTimeSeconds = String.valueOf(modifyTime / 1000);
+ statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+ // just for being compatible with hive metastore
+ statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
LOG.info("alter partition {} with statistic {}.", partitionSpec,
statistic);
metastoreClient.alterPartition(partitionSpec, statistic,
modifyTime, true);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
similarity index 91%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index 842dd012e8..ca51c3df5b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -49,7 +49,7 @@ import java.util.Set;
* This listener will collect data from the newly touched partition and then
decide when to trigger
* a report based on the partition's idle time.
*/
-public class ReportHmsListener implements PartitionListener {
+public class ReportPartStatsListener implements PartitionListener {
@SuppressWarnings("unchecked")
private static final ListStateDescriptor<Map<String, Long>>
PENDING_REPORT_STATE_DESC =
@@ -58,20 +58,20 @@ public class ReportHmsListener implements PartitionListener
{
new MapSerializer<>(StringSerializer.INSTANCE,
LongSerializer.INSTANCE));
private final InternalRowPartitionComputer partitionComputer;
- private final HmsReporter hmsReporter;
+ private final PartitionStatisticsReporter partitionStatisticsReporter;
private final ListState<Map<String, Long>> pendingPartitionsState;
private final Map<String, Long> pendingPartitions;
private final long idleTime;
- private ReportHmsListener(
+ private ReportPartStatsListener(
InternalRowPartitionComputer partitionComputer,
- HmsReporter hmsReporter,
+ PartitionStatisticsReporter partitionStatisticsReporter,
OperatorStateStore store,
boolean isRestored,
long idleTime)
throws Exception {
this.partitionComputer = partitionComputer;
- this.hmsReporter = hmsReporter;
+ this.partitionStatisticsReporter = partitionStatisticsReporter;
this.pendingPartitionsState =
store.getListState(PENDING_REPORT_STATE_DESC);
this.pendingPartitions = new HashMap<>();
if (isRestored) {
@@ -108,7 +108,7 @@ public class ReportHmsListener implements PartitionListener
{
try {
Map<String, Long> partitions = reportPartition(endInput);
for (Map.Entry<String, Long> entry : partitions.entrySet()) {
- hmsReporter.report(entry.getKey(), entry.getValue());
+ partitionStatisticsReporter.report(entry.getKey(),
entry.getValue());
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -138,7 +138,7 @@ public class ReportHmsListener implements PartitionListener
{
pendingPartitionsState.update(Collections.singletonList(pendingPartitions));
}
- public static Optional<ReportHmsListener> create(
+ public static Optional<ReportPartStatsListener> create(
boolean isRestored, OperatorStateStore stateStore, FileStoreTable
table)
throws Exception {
@@ -169,9 +169,9 @@ public class ReportHmsListener implements PartitionListener
{
coreOptions.legacyPartitionName());
return Optional.of(
- new ReportHmsListener(
+ new ReportPartStatsListener(
partitionComputer,
- new HmsReporter(
+ new PartitionStatisticsReporter(
table,
table.catalogEnvironment().metastoreClientFactory().create()),
stateStore,
@@ -182,8 +182,8 @@ public class ReportHmsListener implements PartitionListener
{
@Override
public void close() throws IOException {
- if (hmsReporter != null) {
- hmsReporter.close();
+ if (partitionStatisticsReporter != null) {
+ partitionStatisticsReporter.close();
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
index 415eddb037..ac6af6a14f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.source.operator.CombinedAwareBatchSource;
import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSource;
@@ -44,7 +44,8 @@ import java.util.regex.Pattern;
* compactor jobs in combined mode.
*/
public class CombinedTableCompactorSourceBuilder {
- private final Catalog.Loader catalogLoader;
+
+ private final CatalogLoader catalogLoader;
private final Pattern includingPattern;
private final Pattern excludingPattern;
private final Pattern databasePattern;
@@ -55,7 +56,7 @@ public class CombinedTableCompactorSourceBuilder {
@Nullable private Duration partitionIdleTime = null;
public CombinedTableCompactorSourceBuilder(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern databasePattern,
Pattern includingPattern,
Pattern excludingPattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
index c3a1258bb1..2f7a82c951 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.source.operator;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
@@ -54,7 +54,7 @@ public class CombinedAwareBatchSource extends
CombinedCompactorSource<Tuple2<Spl
private static final Logger LOGGER =
LoggerFactory.getLogger(CombinedAwareBatchSource.class);
public CombinedAwareBatchSource(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern) {
@@ -112,7 +112,7 @@ public class CombinedAwareBatchSource extends
CombinedCompactorSource<Tuple2<Spl
StreamExecutionEnvironment env,
String name,
TypeInformation<RowData> typeInfo,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
index 9bd4a84f57..a23a3b41a4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.source.operator;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
@@ -51,7 +51,7 @@ public class CombinedAwareStreamingSource extends
CombinedCompactorSource<Tuple2
private final long monitorInterval;
public CombinedAwareStreamingSource(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
@@ -107,7 +107,7 @@ public class CombinedAwareStreamingSource extends
CombinedCompactorSource<Tuple2
StreamExecutionEnvironment env,
String name,
TypeInformation<RowData> typeInfo,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
index f58d86cdd6..e292d2441c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.append.UnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.table.source.Split;
@@ -44,16 +44,17 @@ import java.util.regex.Pattern;
* single (non-parallel) monitoring task, it is responsible for the new Paimon
table.
*/
public abstract class CombinedCompactorSource<T> extends
AbstractNonCoordinatedSource<T> {
- private static final long serialVersionUID = 2L;
- protected final Catalog.Loader catalogLoader;
+ private static final long serialVersionUID = 3L;
+
+ protected final CatalogLoader catalogLoader;
protected final Pattern includingPattern;
protected final Pattern excludingPattern;
protected final Pattern databasePattern;
protected final boolean isStreaming;
public CombinedCompactorSource(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
index 64f0c38f5a..5c0d9c42dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source.operator;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.MultiTableScanBase;
@@ -63,7 +64,7 @@ public class CombinedUnawareBatchSource
private static final Logger LOGGER =
LoggerFactory.getLogger(CombinedUnawareBatchSource.class);
public CombinedUnawareBatchSource(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern) {
@@ -121,7 +122,7 @@ public class CombinedUnawareBatchSource
public static DataStream<MultiTableUnawareAppendCompactionTask>
buildSource(
StreamExecutionEnvironment env,
String name,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
index 6ea1ead4db..2e38d538a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
@@ -48,7 +48,7 @@ public class CombinedUnawareStreamingSource
private final long monitorInterval;
public CombinedUnawareStreamingSource(
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
@@ -104,7 +104,7 @@ public class CombinedUnawareStreamingSource
public static DataStream<MultiTableUnawareAppendCompactionTask>
buildSource(
StreamExecutionEnvironment env,
String name,
- Catalog.Loader catalogLoader,
+ CatalogLoader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index fbc8bb9d75..ae3099ec06 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
@@ -60,18 +61,18 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
private static final long serialVersionUID = 1L;
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final boolean isStreaming;
private Duration partitionIdleTime = null;
- public MultiTablesReadOperator(Catalog.Loader catalogLoader, boolean
isStreaming) {
+ public MultiTablesReadOperator(CatalogLoader catalogLoader, boolean
isStreaming) {
this.catalogLoader = catalogLoader;
this.isStreaming = isStreaming;
}
public MultiTablesReadOperator(
- Catalog.Loader catalogLoader, boolean isStreaming, Duration
partitionIdleTime) {
+ CatalogLoader catalogLoader, boolean isStreaming, Duration
partitionIdleTime) {
this.catalogLoader = catalogLoader;
this.isStreaming = isStreaming;
this.partitionIdleTime = partitionIdleTime;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
index 0864741a17..15fde93755 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source.operator;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
@@ -54,12 +55,11 @@ public class MultiUnawareTablesReadOperator
private static final Logger LOG =
LoggerFactory.getLogger(MultiUnawareTablesReadOperator.class);
- private final Catalog.Loader catalogLoader;
+ private final CatalogLoader catalogLoader;
private final Duration partitionIdleTime;
- public MultiUnawareTablesReadOperator(
- Catalog.Loader catalogLoader, Duration partitionIdleTime) {
+ public MultiUnawareTablesReadOperator(CatalogLoader catalogLoader,
Duration partitionIdleTime) {
this.catalogLoader = catalogLoader;
this.partitionIdleTime = partitionIdleTime;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index d487d75925..0e85f559d9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
@@ -273,7 +274,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
}
protected MultiTablesStoreCompactOperator.Factory
createMultiTablesCompactOperator(
- Catalog.Loader catalogLoader) throws Exception {
+ CatalogLoader catalogLoader) throws Exception {
return new MultiTablesStoreCompactOperator.Factory(
catalogLoader,
commitUser,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 752679fb59..53e3a6dcb7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
@@ -78,7 +79,7 @@ class StoreMultiCommitterTest {
private String initialCommitUser;
private Path warehouse;
- private Catalog.Loader catalogLoader;
+ private CatalogLoader catalogLoader;
private Catalog catalog;
private Identifier firstTable;
private Identifier secondTable;
@@ -691,7 +692,7 @@ class StoreMultiCommitterTest {
return harness;
}
- private Catalog.Loader createCatalogLoader() {
+ private CatalogLoader createCatalogLoader() {
Options catalogOptions = createCatalogOptions(warehouse);
return () ->
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
similarity index 95%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
index f245940da5..142a0c32f7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
@@ -49,8 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-/** Test for {@link HmsReporter}. */
-public class HmsReporterTest {
+/** Test for {@link PartitionStatisticsReporter}. */
+public class PartitionStatisticsReporterTest {
@TempDir java.nio.file.Path tempDir;
@@ -131,7 +131,7 @@ public class HmsReporterTest {
}
};
- HmsReporter action = new HmsReporter(table, client);
+ PartitionStatisticsReporter action = new
PartitionStatisticsReporter(table, client);
long time = 1729598544974L;
action.report("c1=a/", time);
Assertions.assertThat(partitionParams).containsKey("c1=a/");
@@ -144,6 +144,8 @@ public class HmsReporterTest {
"591",
"numRows",
"1",
+ "lastUpdateTime",
+ String.valueOf(time / 1000),
"transient_lastDdlTime",
String.valueOf(time / 1000)));
action.close();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index fba5f33807..3b41f39431 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
@@ -662,7 +663,7 @@ public class MultiTablesCompactorSourceBuilderITCase
extends AbstractTestBase
return b;
}
- private Catalog.Loader catalogLoader() {
+ private CatalogLoader catalogLoader() {
// to make the action workflow serializable
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index c74ede9815..5744ac894d 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -103,7 +103,7 @@ import static
org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -872,8 +872,8 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public boolean allowUpperCase() {
- return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
+ public boolean caseSensitive() {
+ return catalogOptions.getOptional(CASE_SENSITIVE).orElse(false);
}
@Override
@@ -931,7 +931,6 @@ public class HiveCatalog extends AbstractCatalog {
public void repairTable(Identifier identifier) throws
TableNotExistException {
checkNotBranch(identifier, "repairTable");
checkNotSystemTable(identifier, "repairTable");
- validateIdentifierNameCaseInsensitive(identifier);
Path location = getTableLocation(identifier);
TableSchema tableSchema =
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index d6318c723f..de6e2414fc 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -69,7 +69,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
@@ -96,9 +96,9 @@ public class SparkCatalog extends SparkBaseCatalog implements
SupportFunction, S
CatalogContext catalogContext =
CatalogContext.create(Options.fromMap(options),
sessionState.newHadoopConf());
- // if spark is case-insensitive, set allow upper case to catalog
+ // if spark is case-insensitive, set case-sensitive to catalog
if (!sessionState.conf().caseSensitiveAnalysis()) {
- newOptions.put(ALLOW_UPPER_CASE.key(), "true");
+ newOptions.put(CASE_SENSITIVE.key(), "true");
}
options = new CaseInsensitiveStringMap(newOptions);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 6b7b17b1b1..b57228fa44 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -62,7 +62,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static
org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
@@ -331,9 +331,9 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase);
}
- // if spark is case-insensitive, set allow upper case to catalog
+ // if spark is case-insensitive, set case-sensitive to catalog
if (!sqlConf.caseSensitiveAnalysis()) {
- options.put(ALLOW_UPPER_CASE.key(), "true");
+ options.put(CASE_SENSITIVE.key(), "true");
}
}