This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 61ed29271b [HUDI-4326] Fix hive sync serde properties (#6722)
61ed29271b is described below
commit 61ed29271bf8e68bb8d5e62fc66463bb8bdabd17
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Sep 20 20:22:30 2022 +0800
[HUDI-4326] Fix hive sync serde properties (#6722)
---
.../java/org/apache/hudi/hive/HiveSyncTool.java | 4 +--
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 40 +++++++++-------------
.../org/apache/hudi/hive/TestHiveSyncTool.java | 6 ++--
.../hudi/sync/common/HoodieMetaSyncOperations.java | 5 +--
4 files changed, 23 insertions(+), 32 deletions(-)
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index ce3114b92e..d0a40bbc18 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -290,9 +290,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
// Sync the table properties if the schema has changed
if (config.getString(HIVE_TABLE_PROPERTIES) != null ||
config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
syncClient.updateTableProperties(tableName, tableProperties);
- HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
- String serDeFormatClassName =
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
- syncClient.updateTableSerDeInfo(tableName, serDeFormatClassName,
serdeProperties);
+ syncClient.updateSerdeProperties(tableName, serdeProperties);
LOG.info("Sync table properties for " + tableName + ", table
properties is: " + tableProperties);
}
schemaChanged = true;
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index a740c93d65..1bdc87ab11 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -18,11 +18,14 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.ddl.DDLExecutor;
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
@@ -52,6 +55,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
@@ -115,7 +119,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
@Override
public void updateTableProperties(String tableName, Map<String, String>
tableProperties) {
- if (tableProperties == null || tableProperties.isEmpty()) {
+ if (MapUtils.isNullOrEmpty(tableProperties)) {
return;
}
try {
@@ -130,34 +134,29 @@ public class HoodieHiveSyncClient extends
HoodieSyncClient {
}
}
- /**
- * Update the table serde properties to the table.
- */
@Override
- public void updateTableSerDeInfo(String tableName, String serdeClass,
Map<String, String> serdeProperties) {
- if (serdeProperties == null || serdeProperties.isEmpty()) {
+ public void updateSerdeProperties(String tableName, Map<String, String>
serdeProperties) {
+ if (MapUtils.isNullOrEmpty(serdeProperties)) {
return;
}
try {
+ serdeProperties.putIfAbsent("serialization.format", "1");
Table table = client.getTable(databaseName, tableName);
- serdeProperties.put("serialization.format", "1");
StorageDescriptor storageDescriptor = table.getSd();
SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
if (serdeInfo != null && serdeInfo.getParametersSize() ==
serdeProperties.size()) {
Map<String, String> parameters = serdeInfo.getParameters();
- boolean same = true;
- for (String key : serdeProperties.keySet()) {
- if (!parameters.containsKey(key) |
!parameters.get(key).equals(serdeProperties.get(key))) {
- same = false;
- break;
- }
- }
- if (same) {
- LOG.debug("Table " + tableName + " serdeProperties already up to
date, skip update");
+ boolean different = serdeProperties.entrySet().stream().anyMatch(e ->
+ !parameters.containsKey(e.getKey()) ||
!parameters.get(e.getKey()).equals(e.getValue()));
+ if (!different) {
+ LOG.debug("Table " + tableName + " serdeProperties already up to
date, skip update serde properties.");
return;
}
}
- storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass,
serdeProperties));
+
+ HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+ String serDeClassName =
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+ storageDescriptor.setSerdeInfo(new SerDeInfo(null, serDeClassName,
serdeProperties));
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to update table serde info for
table: "
@@ -353,11 +352,4 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
}
}
- Table getTable(String tableName) {
- try {
- return client.getTable(databaseName, tableName);
- } catch (TException e) {
- throw new HoodieHiveSyncException(String.format("Database: %s, Table: %s
does not exist", databaseName, tableName), e);
- }
- }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index ba9d33a662..acd75595fb 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -39,6 +39,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -159,9 +160,6 @@ public class TestHiveSyncTool {
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync
completes");
- assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
-
hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(),
- "SerDe info not updated or does not match");
assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getStorageSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
@@ -303,6 +301,7 @@ public class TestHiveSyncTool {
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
hiveDriver.getResults(results);
String ddl = String.join("\n", results);
+ assertTrue(ddl.contains(String.format("ROW FORMAT SERDE \n '%s'",
ParquetHiveSerDe.class.getName())));
assertTrue(ddl.contains("'path'='" + HiveTestUtil.basePath + "'"));
if (syncAsDataSourceTable) {
assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE +
"'='false'"));
@@ -405,6 +404,7 @@ public class TestHiveSyncTool {
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
hiveDriver.getResults(results);
String ddl = String.join("\n", results);
+ assertTrue(ddl.contains(String.format("ROW FORMAT SERDE \n '%s'",
ParquetHiveSerDe.class.getName())));
assertTrue(ddl.contains("'path'='" + HiveTestUtil.basePath + "'"));
assertTrue(ddl.toLowerCase().contains("create external table"));
if (syncAsDataSourceTable) {
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index 5afcf80a87..49edbffd45 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -174,9 +174,10 @@ public interface HoodieMetaSyncOperations {
}
/**
- * Update the table SerDeInfo in metastore.
+ * Update the SerDe properties in metastore.
*/
- default void updateTableSerDeInfo(String tableName, String serdeClass,
Map<String, String> serdeProperties) {
+ default void updateSerdeProperties(String tableName, Map<String, String>
serdeProperties) {
+
}
/**