This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 424dc45db9 [core] format table: fix hms sync partition (#6634)
424dc45db9 is described below
commit 424dc45db96c34b787eb1d4764b06c10eba2eed0
Author: jerry <[email protected]>
AuthorDate: Sat Nov 22 17:29:51 2025 +0800
[core] format table: fix hms sync partition (#6634)
---
.../java/org/apache/paimon/rest/RESTCatalog.java | 2 +-
.../paimon/table/format/FormatTableCommit.java | 35 ++++++++++++++++--
.../apache/paimon/rest/MockRESTCatalogTest.java | 43 ++++++++++++++++++----
.../java/org/apache/paimon/hive/HiveCatalog.java | 38 +++++++++++++++----
.../paimon/spark/sql/FormatTableTestBase.scala | 18 ++++++---
5 files changed, 113 insertions(+), 23 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index f225acda88..a35e9682c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -111,7 +111,7 @@ public class RESTCatalog implements Catalog {
context.preferIO(),
context.fallbackIO());
this.dataTokenEnabled =
api.options().get(RESTTokenFileIO.DATA_TOKEN_ENABLED);
- this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(context.options().toMap());
+ this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(this.context.options().toMap());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
index 80182f8246..ed98c6ba50 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.format;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
@@ -39,6 +40,7 @@ import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -120,7 +122,9 @@ public class FormatTableCommit implements BatchTableCommit {
staticPartitions,
formatTablePartitionOnlyValueInPath,
partitionKeys);
- partitionSpecs.add(staticPartitions);
+ if (staticPartitions.size() == partitionKeys.size()) {
+ partitionSpecs.add(staticPartitions);
+ }
if (overwrite) {
deletePreviousDataFile(partitionPath);
}
@@ -150,8 +154,20 @@ public class FormatTableCommit implements BatchTableCommit
{
}
for (Map<String, String> partitionSpec : partitionSpecs) {
if (hiveCatalog != null) {
- hiveCatalog.createPartitions(
- tableIdentifier,
Collections.singletonList(partitionSpec));
+ try {
+ if (hiveCatalog instanceof DelegateCatalog) {
+ hiveCatalog = ((DelegateCatalog)
hiveCatalog).wrapped();
+ }
+ Method hiveCreatePartitionsInHmsMethod =
+ getHiveCreatePartitionsInHmsMethod();
+ hiveCreatePartitionsInHmsMethod.invoke(
+ hiveCatalog,
+ tableIdentifier,
+ Collections.singletonList(partitionSpec),
+ formatTablePartitionOnlyValueInPath);
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to sync partition
to hms", ex);
+ }
}
}
@@ -161,6 +177,19 @@ public class FormatTableCommit implements BatchTableCommit
{
}
}
+ private Method getHiveCreatePartitionsInHmsMethod() throws
NoSuchMethodException {
+ Method hiveCreatePartitionsInHmsMethod =
+ hiveCatalog
+ .getClass()
+ .getDeclaredMethod(
+ "createPartitionsUtil",
+ Identifier.class,
+ List.class,
+ boolean.class);
+ hiveCreatePartitionsInHmsMethod.setAccessible(true);
+ return hiveCreatePartitionsInHmsMethod;
+ }
+
private LinkedHashMap<String, String> extractPartitionSpecFromPath(
Path partitionPath, List<String> partitionKeys) {
if (formatTablePartitionOnlyValueInPath) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index f6080fb312..150a8e28eb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -179,13 +179,24 @@ class MockRESTCatalogTest extends RESTCatalogTest {
@Test
void testCreateTableDefaultOptions() throws Exception {
- options.set(TABLE_DEFAULT_OPTION_PREFIX + "default-key",
"default-value");
+ String catalogConfigKey = "default-key";
+ options.set(TABLE_DEFAULT_OPTION_PREFIX + catalogConfigKey,
"default-value");
RESTCatalog restCatalog = initCatalog(false);
Identifier identifier = Identifier.create("db1",
"new_table_default_options");
restCatalog.createDatabase(identifier.getDatabaseName(), true);
restCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, true);
assertEquals(
- restCatalog.getTable(identifier).options().get("default-key"),
"default-value");
+
restCatalog.getTable(identifier).options().get(catalogConfigKey),
"default-value");
+ restCatalog.dropTable(identifier, true);
+ restCatalog.dropDatabase(identifier.getDatabaseName(), true, true);
+
+ String catalogConfigInServerKey = "default-key-in-server";
+ restCatalog =
initCatalogWithDefaultTableOption(catalogConfigInServerKey, "default-value");
+ restCatalog.createDatabase(identifier.getDatabaseName(), true);
+ restCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, true);
+ assertEquals(
+
restCatalog.getTable(identifier).options().get(catalogConfigInServerKey),
+ "default-value");
}
@Test
@@ -329,14 +340,28 @@ class MockRESTCatalogTest extends RESTCatalogTest {
}
private RESTCatalog initCatalog(boolean enableDataToken) throws
IOException {
- return initCatalog(enableDataToken, Collections.emptyMap());
+ return initCatalogUtil(enableDataToken, Collections.emptyMap(), null,
null);
}
private RESTCatalog initCatalog(boolean enableDataToken, Map<String,
String> extraOptions)
throws IOException {
+ return initCatalogUtil(enableDataToken, extraOptions, null, null);
+ }
+
+ private RESTCatalog initCatalogWithDefaultTableOption(String key, String
value)
+ throws IOException {
+ return initCatalogUtil(false, Collections.emptyMap(), key, value);
+ }
+
+ private RESTCatalog initCatalogUtil(
+ boolean enableDataToken,
+ Map<String, String> extraOptions,
+ String createTableDefaultKey,
+ String createTableDefaultValue)
+ throws IOException {
String restWarehouse = UUID.randomUUID().toString();
- this.config =
- new ConfigResponse(
+ Map<String, String> defaultConf =
+ new HashMap<>(
ImmutableMap.of(
RESTCatalogInternalOptions.PREFIX.key(),
"paimon",
@@ -345,8 +370,12 @@ class MockRESTCatalogTest extends RESTCatalogTest {
RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
enableDataToken + "",
CatalogOptions.WAREHOUSE.key(),
- restWarehouse),
- ImmutableMap.of());
+ restWarehouse));
+ if (createTableDefaultKey != null) {
+ defaultConf.put(
+ TABLE_DEFAULT_OPTION_PREFIX + createTableDefaultKey,
createTableDefaultValue);
+ }
+ this.config = new ConfigResponse(defaultConf, ImmutableMap.of());
restCatalogServer =
new RESTCatalogServer(dataPath, this.authProvider,
this.config, restWarehouse);
restCatalogServer.start();
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index f225d5baf7..6f2c14a297 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -358,11 +358,32 @@ public class HiveCatalog extends AbstractCatalog {
Identifier.create(identifier.getDatabaseName(),
identifier.getTableName());
Table hmsTable = getHmsTable(tableIdentifier);
TableSchema schema = loadTableSchema(tableIdentifier, hmsTable);
-
if (!metastorePartitioned(schema)) {
+ LOG.info("partition create: not metastorePartitioned");
return;
}
+ createPartitionsUtil(
+ identifier,
+ partitions,
+ new
CoreOptions(schema.options()).formatTablePartitionOnlyValueInPath(),
+ hmsTable);
+ }
+
+ public void createPartitionsUtil(
+ Identifier identifier,
+ List<Map<String, String>> partitions,
+ boolean partitionOnlyValueInPath)
+ throws TableNotExistException {
+ Table hmsTable = getHmsTable(identifier);
+ createPartitionsUtil(identifier, partitions, partitionOnlyValueInPath,
hmsTable);
+ }
+ private void createPartitionsUtil(
+ Identifier tableIdentifier,
+ List<Map<String, String>> partitions,
+ boolean partitionOnlyValueInPath,
+ Table hmsTable)
+ throws TableNotExistException {
int currentTime = (int) (System.currentTimeMillis() / 1000);
StorageDescriptor sd = hmsTable.getSd();
String dataFilePath = getDataFilePath(tableIdentifier, hmsTable);
@@ -370,13 +391,14 @@ public class HiveCatalog extends AbstractCatalog {
for (Map<String, String> partitionSpec : partitions) {
Partition hivePartition = new Partition();
StorageDescriptor newSd = new StorageDescriptor(sd);
- hivePartition.setDbName(identifier.getDatabaseName());
- hivePartition.setTableName(identifier.getTableName());
+ hivePartition.setDbName(tableIdentifier.getDatabaseName());
+ hivePartition.setTableName(tableIdentifier.getTableName());
hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
hivePartition.setSd(newSd);
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);
- String partitionLocation = getPartitionLocation(dataFilePath,
partitionSpec);
+ String partitionLocation =
+ getPartitionLocation(dataFilePath, partitionSpec,
partitionOnlyValueInPath);
locationHelper.specifyPartitionLocation(hivePartition,
partitionLocation);
hivePartitions.add(hivePartition);
}
@@ -429,10 +451,12 @@ public class HiveCatalog extends AbstractCatalog {
: tableLocation;
}
- private String getPartitionLocation(String dataFilePath, Map<String,
String> partitionSpec) {
+ private String getPartitionLocation(
+ String dataFilePath, Map<String, String> partitionSpec, boolean
onlyValue) {
return dataFilePath
+ Path.SEPARATOR
- + PartitionPathUtils.generatePartitionPath(new
LinkedHashMap<>(partitionSpec));
+ + PartitionPathUtils.generatePartitionPathUtil(
+ new LinkedHashMap<>(partitionSpec), onlyValue);
}
@Override
@@ -1344,7 +1368,7 @@ public class HiveCatalog extends AbstractCatalog {
} catch (NoSuchObjectException e) {
throw new TableNotExistException(identifier);
} catch (TException e) {
- if (e.getMessage().contains("Permission.NotAllow")) {
+ if (e.getMessage() != null &&
e.getMessage().contains("Permission.NotAllow")) {
throw new TableNoPermissionException(identifier, e);
}
throw new RuntimeException(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 573f3c8899..ad2384d49d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -51,8 +51,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase
{
withTable(tableName) {
val hiveCatalog =
paimonCatalog.asInstanceOf[DelegateCatalog].wrapped().asInstanceOf[HiveCatalog]
- sql(
- s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint) TBLPROPERTIES ('metastore.partitioned-table'='true')")
+ sql(s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint)")
sql(s"INSERT INTO $tableName VALUES (1, 2023)")
var ds = 2023L
checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds)))
@@ -60,12 +59,21 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
assert(partitions.size == 0)
sql(s"DROP TABLE $tableName")
sql(
- s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint) TBLPROPERTIES ('format-table.commit-hive-sync-url'='$hiveUri',
'metastore.partitioned-table'='true')")
+ s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint, `hh` int) TBLPROPERTIES
('format-table.commit-hive-sync-url'='$hiveUri')")
ds = 2024L
- sql(s"INSERT INTO $tableName VALUES (1, $ds)")
- checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds)))
+ val hh = 10
+ sql(s"INSERT OVERWRITE $tableName PARTITION(ds=$ds, hh) VALUES (1, $hh)")
+ checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds, hh)))
partitions =
hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName))
assert(partitions.get(0).getValues.get(0).equals(ds.toString))
+
assert(partitions.get(0).getSd.getLocation.split("/").last.equals(s"hh=$hh"))
+ sql(s"DROP TABLE $tableName")
+ sql(s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint) " +
+ s"TBLPROPERTIES ('format-table.commit-hive-sync-url'='$hiveUri',
'format-table.partition-path-only-value'='true')")
+ ds = 2025L
+ sql(s"INSERT INTO $tableName VALUES (1, $ds)")
+ partitions =
hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName))
+
assert(partitions.get(0).getSd.getLocation.split("/").last.equals(ds.toString))
}
}