This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 659cf39957 [core] Refactor MetastoreClient methods to simplify catalog
(#4726)
659cf39957 is described below
commit 659cf3995752432aa939569ee244e548f70ec77e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 17 13:54:31 2024 +0800
[core] Refactor MetastoreClient methods to simplify catalog (#4726)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 5 +-
.../metastore/AddPartitionCommitCallback.java | 11 +-
.../paimon/metastore/AddPartitionTagCallback.java | 2 +-
.../apache/paimon/metastore/MetastoreClient.java | 29 ++---
.../apache/paimon/metastore/PartitionStats.java | 64 +++++++++++
.../apache/paimon/operation/PartitionExpire.java | 16 ++-
.../actions/MarkPartitionDoneEventAction.java | 2 +-
.../paimon/table/AbstractFileStoreTable.java | 11 +-
.../paimon/operation/PartitionExpireTest.java | 65 +++++++++++-
.../partition/PartitionStatisticsReporter.java | 27 ++---
.../sink/partition/AddDonePartitionActionTest.java | 30 +++---
.../partition/PartitionStatisticsReporterTest.java | 45 +++-----
.../java/org/apache/paimon/hive/HiveCatalog.java | 31 +++---
.../apache/paimon/hive/HiveMetastoreClient.java | 118 +++++++++------------
.../paimon/spark/PaimonPartitionManagement.scala | 2 +-
15 files changed, 267 insertions(+), 191 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index db69092955..3fdefe6cac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -417,7 +417,7 @@ public abstract class AbstractCatalog implements Catalog {
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
- metastoreClientFactory(identifier,
tableMeta.schema).orElse(null)));
+
metastoreClientFactory(identifier).orElse(null)));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
@@ -485,8 +485,7 @@ public abstract class AbstractCatalog implements Catalog {
throws TableNotExistException;
/** Get metastore client factory for the table specified by {@code
identifier}. */
- public Optional<MetastoreClient.Factory> metastoreClientFactory(
- Identifier identifier, TableSchema schema) {
+ public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
return Optional.empty();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 599f88e512..26fb9ed48d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
@@ -48,9 +49,12 @@ public class AddPartitionCommitCallback implements
CommitCallback {
.build();
private final MetastoreClient client;
+ private final InternalRowPartitionComputer partitionComputer;
- public AddPartitionCommitCallback(MetastoreClient client) {
+ public AddPartitionCommitCallback(
+ MetastoreClient client, InternalRowPartitionComputer
partitionComputer) {
this.client = client;
+ this.partitionComputer = partitionComputer;
}
@Override
@@ -81,7 +85,10 @@ public class AddPartitionCommitCallback implements
CommitCallback {
}
}
if (!newPartitions.isEmpty()) {
- client.addPartitions(newPartitions);
+ client.addPartitions(
+ newPartitions.stream()
+ .map(partitionComputer::generatePartValues)
+ .collect(Collectors.toList()));
newPartitions.forEach(partition -> cache.put(partition, true));
}
} catch (Exception e) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
index 70efe68e83..31bb521e88 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -49,7 +49,7 @@ public class AddPartitionTagCallback implements TagCallback {
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
partitionSpec.put(partitionField, tagName);
try {
- client.deletePartition(partitionSpec);
+ client.dropPartition(partitionSpec);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index 75f7af5abb..ccf5f38538 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -18,12 +18,9 @@
package org.apache.paimon.metastore;
-import org.apache.paimon.data.BinaryRow;
-
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
/**
* A metastore client related to a table. All methods of this interface
operate on the same specific
@@ -31,32 +28,18 @@ import java.util.Map;
*/
public interface MetastoreClient extends AutoCloseable {
- void addPartition(BinaryRow partition) throws Exception;
-
- default void addPartitions(List<BinaryRow> partitions) throws Exception {
- for (BinaryRow partition : partitions) {
- addPartition(partition);
- }
- }
+ void addPartition(LinkedHashMap<String, String> partition) throws
Exception;
- void addPartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+ void addPartitions(List<LinkedHashMap<String, String>> partitions) throws
Exception;
- default void addPartitionsSpec(List<LinkedHashMap<String, String>>
partitionSpecsList)
- throws Exception {
- for (LinkedHashMap<String, String> partitionSpecs :
partitionSpecsList) {
- addPartition(partitionSpecs);
- }
- }
+ void dropPartition(LinkedHashMap<String, String> partition) throws
Exception;
- void deletePartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+ void dropPartitions(List<LinkedHashMap<String, String>> partitions) throws
Exception;
- void markDone(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+ void markPartitionDone(LinkedHashMap<String, String> partition) throws
Exception;
default void alterPartition(
- LinkedHashMap<String, String> partitionSpec,
- Map<String, String> parameters,
- long modifyTime,
- boolean ignoreIfNotExist)
+ LinkedHashMap<String, String> partition, PartitionStats
partitionStats)
throws Exception {
throw new UnsupportedOperationException();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
new file mode 100644
index 0000000000..eacc400f52
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+/** Statistic for partition. */
+public interface PartitionStats {
+
+ long numFiles();
+
+ long totalSize();
+
+ long numRows();
+
+ long lastUpdateTimeMillis();
+
+ static PartitionStats create(
+ long numFiles, long totalSize, long numRows, long
lastUpdateTimeMillis) {
+ return new PartitionStats() {
+
+ @Override
+ public long numFiles() {
+ return numFiles;
+ }
+
+ @Override
+ public long totalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public long numRows() {
+ return numRows;
+ }
+
+ @Override
+ public long lastUpdateTimeMillis() {
+ return lastUpdateTimeMillis;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "numFiles: %s, totalSize: %s, numRows: %s,
lastUpdateTimeMillis: %s",
+ numFiles, totalSize, numRows, lastUpdateTimeMillis);
+ }
+ };
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index d432a37dfd..68ef8a1237 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -167,15 +167,13 @@ public class PartitionExpire {
}
private void deleteMetastorePartitions(List<Map<String, String>>
partitions) {
- if (metastoreClient != null) {
- partitions.forEach(
- partition -> {
- try {
- metastoreClient.deletePartition(new
LinkedHashMap<>(partition));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ if (metastoreClient != null && partitions.size() > 0) {
+ try {
+ metastoreClient.dropPartitions(
+
partitions.stream().map(LinkedHashMap::new).collect(Collectors.toList()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
index a5ebe34051..8cc1c93ba9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
@@ -39,7 +39,7 @@ public class MarkPartitionDoneEventAction implements
PartitionMarkDoneAction {
public void markDone(String partition) throws Exception {
LinkedHashMap<String, String> partitionSpec =
extractPartitionSpecFromPath(new Path(partition));
- metastoreClient.markDone(partitionSpec);
+ metastoreClient.markPartitionDone(partitionSpec);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 57966d24ce..7e008698c4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -61,6 +61,7 @@ import
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanne
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SimpleFileReader;
@@ -469,7 +470,15 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
if (options.partitionedTableInMetastore()
&& metastoreClientFactory != null
&& !tableSchema.partitionKeys().isEmpty()) {
- callbacks.add(new
AddPartitionCommitCallback(metastoreClientFactory.create()));
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ tableSchema.logicalPartitionType(),
+ tableSchema.partitionKeys().toArray(new String[0]),
+ options.legacyPartitionName());
+ callbacks.add(
+ new AddPartitionCommitCallback(
+ metastoreClientFactory.create(),
partitionComputer));
}
TagPreview tagPreview = TagPreview.create(options);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 931bac59c7..893fe1bf57 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -27,8 +27,12 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
@@ -54,6 +58,7 @@ import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -61,9 +66,11 @@ import java.util.concurrent.ThreadLocalRandom;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
+import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -75,12 +82,54 @@ public class PartitionExpireTest {
private Path path;
private FileStoreTable table;
+ private List<LinkedHashMap<String, String>> deletedPartitions;
@BeforeEach
public void beforeEach() {
path = new Path(tempDir.toUri());
}
+ private void newTable() {
+ LocalFileIO fileIO = LocalFileIO.create();
+ Options options = new Options();
+ options.set(PATH, path.toString());
+ Path tablePath = CoreOptions.path(options);
+ String branchName = CoreOptions.branch(options.toMap());
+ TableSchema tableSchema = new SchemaManager(fileIO, tablePath,
branchName).latest().get();
+ deletedPartitions = new ArrayList<>();
+ MetastoreClient.Factory factory =
+ () ->
+ new MetastoreClient() {
+ @Override
+ public void addPartition(LinkedHashMap<String,
String> partition) {}
+
+ @Override
+ public void addPartitions(
+ List<LinkedHashMap<String, String>>
partitions) {}
+
+ @Override
+ public void dropPartition(LinkedHashMap<String,
String> partition) {
+ deletedPartitions.add(partition);
+ }
+
+ @Override
+ public void dropPartitions(
+ List<LinkedHashMap<String, String>>
partitions) {
+ deletedPartitions.addAll(partitions);
+ }
+
+ @Override
+ public void
markPartitionDone(LinkedHashMap<String, String> partition) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {}
+ };
+ CatalogEnvironment env = new CatalogEnvironment(null, null,
Lock.emptyFactory(), factory);
+ table = FileStoreTableFactory.create(fileIO, path, tableSchema, env);
+ }
+
@Test
public void testNonPartitionedTable() {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
@@ -108,7 +157,7 @@ public class PartitionExpireTest {
emptyList(),
Collections.emptyMap(),
""));
- table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ newTable();
write("20230101", "11");
write("abcd", "12");
write("20230101", "12");
@@ -129,9 +178,9 @@ public class PartitionExpireTest {
RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
singletonList("f0"),
emptyList(),
- Collections.emptyMap(),
+
Collections.singletonMap(METASTORE_PARTITIONED_TABLE.key(), "true"),
""));
- table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ newTable();
write("20230101", "11");
write("20230101", "12");
@@ -156,6 +205,12 @@ public class PartitionExpireTest {
expire.expire(date(8), Long.MAX_VALUE);
assertThat(read()).isEmpty();
+
+ assertThat(deletedPartitions)
+ .containsExactlyInAnyOrder(
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230101")),
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230103")),
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230105")));
}
@Test
@@ -169,7 +224,7 @@ public class PartitionExpireTest {
Collections.emptyMap(),
""));
- table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ newTable();
// disable compaction and snapshot expiration
table = table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"));
String commitUser = UUID.randomUUID().toString();
@@ -243,7 +298,7 @@ public class PartitionExpireTest {
emptyList(),
Collections.emptyMap(),
""));
- table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ newTable();
table = newExpireTable();
List<CommitMessage> commitMessages = write("20230101", "11");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
index b75889d567..ced37726f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
@@ -35,15 +36,9 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
-import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
-import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
-import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
-import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
/** Action to report the table statistic from the latest snapshot to HMS. */
@@ -51,8 +46,6 @@ public class PartitionStatisticsReporter implements Closeable
{
private static final Logger LOG =
LoggerFactory.getLogger(PartitionStatisticsReporter.class);
- private static final String HIVE_LAST_UPDATE_TIME_PROP =
"transient_lastDdlTime";
-
private final MetastoreClient metastoreClient;
private final SnapshotReader snapshotReader;
private final SnapshotManager snapshotManager;
@@ -64,7 +57,7 @@ public class PartitionStatisticsReporter implements Closeable
{
this.snapshotManager = table.snapshotManager();
}
- public void report(String partition, long modifyTime) throws Exception {
+ public void report(String partition, long modifyTimeMillis) throws
Exception {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot != null) {
LinkedHashMap<String, String> partitionSpec =
@@ -88,19 +81,11 @@ public class PartitionStatisticsReporter implements
Closeable {
totalSize += fileMeta.fileSize();
}
}
- Map<String, String> statistic = new HashMap<>();
- statistic.put(NUM_FILES_PROP, String.valueOf(fileCount));
- statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize));
- statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount));
-
- String modifyTimeSeconds = String.valueOf(modifyTime / 1000);
- statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
-
- // just for being compatible with hive metastore
- statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
- LOG.info("alter partition {} with statistic {}.", partitionSpec,
statistic);
- metastoreClient.alterPartition(partitionSpec, statistic,
modifyTime, true);
+ PartitionStats partitionStats =
+ PartitionStats.create(fileCount, totalSize, rowCount,
modifyTimeMillis);
+ LOG.info("alter partition {} with statistic {}.", partitionSpec,
partitionStats);
+ metastoreClient.alterPartition(partitionSpec, partitionStats);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
index fca5dcf0ed..3bdbdd20ad 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
@@ -18,15 +18,15 @@
package org.apache.paimon.flink.sink.partition;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
import org.apache.paimon.partition.actions.AddDonePartitionAction;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,39 +41,41 @@ class AddDonePartitionActionTest {
Set<String> donePartitions = new HashSet<>();
MetastoreClient metastoreClient =
new MetastoreClient() {
+
@Override
- public void addPartition(BinaryRow partition) {
- throw new UnsupportedOperationException();
+ public void addPartition(LinkedHashMap<String, String>
partition) {
+ donePartitions.add(generatePartitionPath(partition));
+ }
+
+ @Override
+ public void addPartitions(List<LinkedHashMap<String,
String>> partitions) {
+ partitions.forEach(this::addPartition);
}
@Override
- public void addPartition(LinkedHashMap<String, String>
partitionSpec) {
-
donePartitions.add(generatePartitionPath(partitionSpec));
+ public void dropPartition(LinkedHashMap<String, String>
partition) {
+ throw new UnsupportedOperationException();
}
@Override
- public void deletePartition(LinkedHashMap<String, String>
partitionSpec) {
+ public void dropPartitions(List<LinkedHashMap<String,
String>> partitions) {
throw new UnsupportedOperationException();
}
@Override
- public void markDone(LinkedHashMap<String, String>
partitionSpec)
- throws Exception {
+ public void markPartitionDone(LinkedHashMap<String,
String> partitions) {
throw new UnsupportedOperationException();
}
@Override
public void alterPartition(
LinkedHashMap<String, String> partitionSpec,
- Map<String, String> parameters,
- long modifyTime,
- boolean ignoreIfNotExist)
- throws Exception {
+ PartitionStats partitionStats) {
throw new UnsupportedOperationException();
}
@Override
- public void close() throws Exception {
+ public void close() {
closed.set(true);
}
};
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
index 142a0c32f7..0f761efa22 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
@@ -18,12 +18,12 @@
package org.apache.paimon.flink.sink.partition;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
@@ -35,7 +35,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.PartitionPathUtils;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
@@ -86,47 +85,47 @@ public class PartitionStatisticsReporterTest {
BatchTableCommit committer = table.newBatchWriteBuilder().newCommit();
committer.commit(messages);
AtomicBoolean closed = new AtomicBoolean(false);
- Map<String, Map<String, String>> partitionParams = Maps.newHashMap();
+ Map<String, PartitionStats> partitionParams = Maps.newHashMap();
MetastoreClient client =
new MetastoreClient() {
+
+ @Override
+ public void addPartition(LinkedHashMap<String, String>
partition) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
- public void addPartition(BinaryRow partition) throws
Exception {
+ public void addPartitions(List<LinkedHashMap<String,
String>> partitions) {
throw new UnsupportedOperationException();
}
@Override
- public void addPartition(LinkedHashMap<String, String>
partitionSpec)
- throws Exception {
+ public void dropPartition(LinkedHashMap<String, String>
partition) {
throw new UnsupportedOperationException();
}
@Override
- public void deletePartition(LinkedHashMap<String, String>
partitionSpec)
- throws Exception {
+ public void dropPartitions(List<LinkedHashMap<String,
String>> partitions) {
throw new UnsupportedOperationException();
}
@Override
- public void markDone(LinkedHashMap<String, String>
partitionSpec)
- throws Exception {
+ public void markPartitionDone(LinkedHashMap<String,
String> partitionSpec) {
throw new UnsupportedOperationException();
}
@Override
public void alterPartition(
LinkedHashMap<String, String> partitionSpec,
- Map<String, String> parameters,
- long modifyTime,
- boolean ignoreIfNotExist)
- throws Exception {
+ PartitionStats partitionStats) {
partitionParams.put(
PartitionPathUtils.generatePartitionPath(partitionSpec),
- parameters);
+ partitionStats);
}
@Override
- public void close() throws Exception {
+ public void close() {
closed.set(true);
}
};
@@ -135,19 +134,9 @@ public class PartitionStatisticsReporterTest {
long time = 1729598544974L;
action.report("c1=a/", time);
Assertions.assertThat(partitionParams).containsKey("c1=a/");
- Assertions.assertThat(partitionParams.get("c1=a/"))
+ Assertions.assertThat(partitionParams.get("c1=a/").toString())
.isEqualTo(
- ImmutableMap.of(
- "numFiles",
- "1",
- "totalSize",
- "591",
- "numRows",
- "1",
- "lastUpdateTime",
- String.valueOf(time / 1000),
- "transient_lastDdlTime",
- String.valueOf(time / 1000)));
+ "numFiles: 1, totalSize: 591, numRows: 1,
lastUpdateTimeMillis: 1729598544974");
action.close();
Assertions.assertThat(closed).isTrue();
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 5744ac894d..f5ae504850 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -27,7 +27,6 @@ import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.pool.CachedClientPool;
@@ -48,6 +47,7 @@ import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.view.View;
@@ -191,13 +191,12 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public Optional<MetastoreClient.Factory> metastoreClientFactory(
- Identifier identifier, TableSchema schema) {
+ public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
Identifier tableIdentifier =
new Identifier(identifier.getDatabaseName(),
identifier.getTableName());
return Optional.of(
new HiveMetastoreClient.Factory(
- tableIdentifier, schema, hiveConf, clientClassName,
options));
+ tableIdentifier, hiveConf, clientClassName, options));
}
@Override
@@ -350,9 +349,8 @@ public class HiveCatalog extends AbstractCatalog {
new HiveMetastoreClient(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName()),
- tableSchema,
clients);
- metastoreClient.deletePartition(new
LinkedHashMap<>(partitionSpec));
+ metastoreClient.dropPartition(new
LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -610,7 +608,7 @@ public class HiveCatalog extends AbstractCatalog {
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
- metastoreClientFactory(identifier,
tableMeta.schema()).orElse(null)));
+ metastoreClientFactory(identifier).orElse(null)));
} catch (TableNotExistException ignore) {
}
@@ -968,14 +966,19 @@ public class HiveCatalog extends AbstractCatalog {
// repair partitions
if (!tableSchema.partitionKeys().isEmpty() &&
!newTable.getPartitionKeys().isEmpty()) {
// Do not close client, it is for HiveCatalog
+ CoreOptions options = new CoreOptions(tableSchema.options());
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ tableSchema.logicalPartitionType(),
+ tableSchema.partitionKeys().toArray(new
String[0]),
+ options.legacyPartitionName());
@SuppressWarnings("resource")
- HiveMetastoreClient metastoreClient =
- new HiveMetastoreClient(identifier, tableSchema,
clients);
- List<BinaryRow> partitions =
-
getTable(identifier).newReadBuilder().newScan().listPartitions();
- for (BinaryRow partition : partitions) {
- metastoreClient.addPartition(partition);
- }
+ HiveMetastoreClient metastoreClient = new
HiveMetastoreClient(identifier, clients);
+ metastoreClient.addPartitions(
+
getTable(identifier).newReadBuilder().newScan().listPartitions().stream()
+ .map(partitionComputer::generatePartValues)
+ .collect(Collectors.toList()));
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 885fa463e5..f7be538c25 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -18,15 +18,12 @@
package org.apache.paimon.hive;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -39,34 +36,30 @@ import
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.thrift.TException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
+import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
+import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
+import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
+
/** {@link MetastoreClient} for Hive tables. */
public class HiveMetastoreClient implements MetastoreClient {
+ private static final String HIVE_LAST_UPDATE_TIME_PROP =
"transient_lastDdlTime";
+
private final Identifier identifier;
- private final InternalRowPartitionComputer partitionComputer;
private final ClientPool<IMetaStoreClient, TException> clients;
private final StorageDescriptor sd;
- HiveMetastoreClient(
- Identifier identifier,
- TableSchema schema,
- ClientPool<IMetaStoreClient, TException> clients)
+ HiveMetastoreClient(Identifier identifier, ClientPool<IMetaStoreClient,
TException> clients)
throws TException, InterruptedException {
this.identifier = identifier;
- CoreOptions options = new CoreOptions(schema.options());
- this.partitionComputer =
- new InternalRowPartitionComputer(
- options.partitionDefaultName(),
- schema.logicalPartitionType(),
- schema.partitionKeys().toArray(new String[0]),
- options.legacyPartitionName());
-
this.clients = clients;
this.sd =
this.clients
@@ -79,22 +72,9 @@ public class HiveMetastoreClient implements MetastoreClient {
}
@Override
- public void addPartition(BinaryRow partition) throws Exception {
- addPartition(partitionComputer.generatePartValues(partition));
- }
-
- @Override
- public void addPartitions(List<BinaryRow> partitions) throws Exception {
- addPartitionsSpec(
- partitions.stream()
- .map(partitionComputer::generatePartValues)
- .collect(Collectors.toList()));
- }
-
- @Override
- public void addPartition(LinkedHashMap<String, String> partitionSpec)
throws Exception {
+ public void addPartition(LinkedHashMap<String, String> partition) throws
Exception {
Partition hivePartition =
- toHivePartition(partitionSpec, (int)
(System.currentTimeMillis() / 1000));
+ toHivePartition(partition, (int) (System.currentTimeMillis() /
1000));
clients.execute(
client -> {
try {
@@ -105,11 +85,10 @@ public class HiveMetastoreClient implements
MetastoreClient {
}
@Override
- public void addPartitionsSpec(List<LinkedHashMap<String, String>>
partitionSpecsList)
- throws Exception {
+ public void addPartitions(List<LinkedHashMap<String, String>> partitions)
throws Exception {
int currentTime = (int) (System.currentTimeMillis() / 1000);
List<Partition> hivePartitions =
- partitionSpecsList.stream()
+ partitions.stream()
.map(partitionSpec -> toHivePartition(partitionSpec,
currentTime))
.collect(Collectors.toList());
clients.execute(client -> client.add_partitions(hivePartitions, true,
false));
@@ -117,43 +96,45 @@ public class HiveMetastoreClient implements
MetastoreClient {
@Override
public void alterPartition(
- LinkedHashMap<String, String> partitionSpec,
- Map<String, String> parameters,
- long modifyTime,
- boolean ignoreIfNotExist)
+ LinkedHashMap<String, String> partition, PartitionStats
partitionStats)
throws Exception {
- List<String> partitionValues = new ArrayList<>(partitionSpec.values());
- int currentTime = (int) (modifyTime / 1000);
- Partition hivePartition;
+ List<String> partitionValues = new ArrayList<>(partition.values());
+
+ Map<String, String> statistic = new HashMap<>();
+ statistic.put(NUM_FILES_PROP,
String.valueOf(partitionStats.numFiles()));
+ statistic.put(TOTAL_SIZE_PROP,
String.valueOf(partitionStats.totalSize()));
+ statistic.put(NUM_ROWS_PROP, String.valueOf(partitionStats.numRows()));
+
+ String modifyTimeSeconds =
String.valueOf(partitionStats.lastUpdateTimeMillis() / 1000);
+ statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+ // just for being compatible with hive metastore
+ statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
try {
- hivePartition =
+ Partition hivePartition =
clients.run(
client ->
client.getPartition(
identifier.getDatabaseName(),
identifier.getObjectName(),
partitionValues));
+ hivePartition.setValues(partitionValues);
+ hivePartition.setLastAccessTime((int)
(partitionStats.lastUpdateTimeMillis() / 1000));
+ hivePartition.getParameters().putAll(statistic);
+ clients.execute(
+ client ->
+ client.alter_partition(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ hivePartition));
} catch (NoSuchObjectException e) {
- if (ignoreIfNotExist) {
- return;
- } else {
- throw e;
- }
+ // do nothing if the partition not exists
}
-
- hivePartition.setValues(partitionValues);
- hivePartition.setLastAccessTime(currentTime);
- hivePartition.getParameters().putAll(parameters);
- clients.execute(
- client ->
- client.alter_partition(
- identifier.getDatabaseName(),
- identifier.getObjectName(),
- hivePartition));
}
@Override
- public void deletePartition(LinkedHashMap<String, String> partitionSpec)
throws Exception {
+ public void dropPartition(LinkedHashMap<String, String> partitionSpec)
throws Exception {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
try {
clients.execute(
@@ -169,7 +150,14 @@ public class HiveMetastoreClient implements
MetastoreClient {
}
@Override
- public void markDone(LinkedHashMap<String, String> partitionSpec) throws
Exception {
+ public void dropPartitions(List<LinkedHashMap<String, String>> partitions)
throws Exception {
+ for (LinkedHashMap<String, String> partition : partitions) {
+ dropPartition(partition);
+ }
+ }
+
+ @Override
+ public void markPartitionDone(LinkedHashMap<String, String> partitionSpec)
throws Exception {
try {
clients.execute(
client ->
@@ -213,19 +201,13 @@ public class HiveMetastoreClient implements
MetastoreClient {
private static final long serialVersionUID = 1L;
private final Identifier identifier;
- private final TableSchema schema;
private final SerializableHiveConf hiveConf;
private final String clientClassName;
private final Options options;
public Factory(
- Identifier identifier,
- TableSchema schema,
- HiveConf hiveConf,
- String clientClassName,
- Options options) {
+ Identifier identifier, HiveConf hiveConf, String
clientClassName, Options options) {
this.identifier = identifier;
- this.schema = schema;
this.hiveConf = new SerializableHiveConf(hiveConf);
this.clientClassName = clientClassName;
this.options = options;
@@ -236,7 +218,7 @@ public class HiveMetastoreClient implements MetastoreClient
{
HiveConf conf = hiveConf.conf();
try {
return new HiveMetastoreClient(
- identifier, schema, new CachedClientPool(conf,
options, clientClassName));
+ identifier, new CachedClientPool(conf, options,
clientClassName));
} catch (TException e) {
throw new RuntimeException(
"Can not get table " + identifier + " info from
metastore.", e);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 840f1341a6..c385f243ae 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -78,7 +78,7 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
// sync to metastore with delete partitions
if (clientFactory != null &&
fileStoreTable.coreOptions().partitionedTableInMetastore()) {
metastoreClient = clientFactory.create()
- toPaimonPartitions(rows).foreach(metastoreClient.deletePartition)
+
metastoreClient.dropPartitions(toPaimonPartitions(rows).toSeq.asJava)
}
} finally {
commit.close()