This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 164b35a79e6d fix: Recreate tables during meta sync on certain
exceptions (#14317)
164b35a79e6d is described below
commit 164b35a79e6decb868780964b2bdac1fc35f23b7
Author: Lin Liu <[email protected]>
AuthorDate: Thu Feb 12 09:11:10 2026 -0800
fix: Recreate tables during meta sync on certain exceptions (#14317)
---------
Co-authored-by: Aditya Goenka <[email protected]>
Co-authored-by: Vamsi <[email protected]>
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 78 ++++++
.../hudi/aws/sync/AwsGlueCatalogSyncTool.java | 6 +
.../hudi/config/GlueCatalogSyncClientConfig.java | 8 +
.../java/org/apache/hudi/common/fs/FSUtils.java | 12 +
.../hudi/common/testutils/SchemaTestUtil.java | 27 +++
.../org/apache/hudi/common/fs/TestFSUtils.java | 81 ++++---
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 8 +
.../java/org/apache/hudi/hive/HiveSyncTool.java | 95 ++++++--
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 49 ++++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 262 ++++++++++++++++++---
.../hudi/sync/common/HoodieMetaSyncOperations.java | 29 +++
.../apache/hudi/sync/common/HoodieSyncClient.java | 6 +
12 files changed, 575 insertions(+), 86 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 53389f3e463f..b657c5c4e7af 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -55,6 +55,7 @@ import
software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.CreateTableResponse;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.DeletePartitionIndexRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
import software.amazon.awssdk.services.glue.model.GetPartitionIndexesRequest;
@@ -517,6 +518,52 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ @Override
+ public void createOrReplaceTable(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String> tableProperties) {
+
+ if (!tableExists(tableName)) {
+ // if table doesn't exist before, directly create new table.
+ createTable(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ return;
+ }
+
+ try {
+ // validate before dropping the table
+ validateSchemaAndProperties(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ // drop and recreate the actual table
+ dropTable(tableName);
+ createTable(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to recreate the table" +
tableId(databaseName, tableName), e);
+ }
+ }
+
+ /**
+ * creates a temp table with the given schema and properties to ensure
+ * table creation succeeds before dropping the table and recreating it.
+ * This ensures that actual table is not dropped in case there are any
+ * issues with table creation because of provided schema or properties
+ */
+ private void validateSchemaAndProperties(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String>
tableProperties) {
+ // Create a temp table to validate the schema and properties
+ String tempTableName = generateTempTableName(tableName);
+ createTable(tempTableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ // drop the temp table
+ dropTable(tempTableName);
+ }
+
@Override
public void createTable(String tableName,
MessageType storageSchema,
@@ -822,6 +869,27 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ @Override
+ public void dropTable(String tableName) {
+ DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
+ .databaseName(databaseName)
+ .name(tableName)
+ .build();
+
+ try {
+ awsGlue.deleteTable(deleteTableRequest).get();
+ LOG.info("Successfully deleted table in AWS Glue: {}.{}", databaseName,
tableName);
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ // In case {@code InterruptedException} was thrown, resetting the
interrupted flag
+ // of the thread, we reset it (to true) again to permit subsequent
handlers
+ // to be interrupted as well
+ Thread.currentThread().interrupt();
+ }
+ throw new HoodieGlueSyncException("Failed to delete table " +
tableId(databaseName, tableName), e);
+ }
+ }
+
@Override
public Option<String> getLastReplicatedTime(String tableName) {
throw new UnsupportedOperationException("Not supported:
`getLastReplicatedTime`");
@@ -842,6 +910,16 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
return new
GluePartitionFilterGenerator().generatePushDownFilter(writtenPartitions,
partitionFields, (HiveSyncConfig) config);
}
+ @Override
+ public String getTableLocation(String tableName) {
+ try {
+ Table table = getTable(awsGlue, databaseName, tableName);
+ return table.storageDescriptor().location();
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to get base path for the table "
+ tableId(databaseName, tableName), e);
+ }
+ }
+
private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index e86a6b99f5cc..57070b98b6ad 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.RECREATE_GLUE_TABLE_ON_ERROR;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
/**
@@ -52,6 +53,11 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}
+ @Override
+ protected boolean shouldRecreateAndSyncTable() {
+ return config.getBooleanOrDefault(RECREATE_GLUE_TABLE_ON_ERROR);
+ }
+
public static void main(String[] args) {
final HiveSyncConfig.HiveSyncConfigParams params = new
HiveSyncConfig.HiveSyncConfigParams();
JCommander cmd = JCommander.newBuilder().addObject(params).build();
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index fd198eff6263..8c12c229cdce 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -89,4 +89,12 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.withDocumentation(String.join(" ", "Specify the partitions fields to
index on aws glue. Separate the fields by semicolon.",
"By default, when the feature is enabled, all the partition will be
indexed.",
"You can create up to three indexes, separate them by comma. Eg:
col1;col2;col3,col2,col3"));
+
+ public static final ConfigProperty<Boolean> RECREATE_GLUE_TABLE_ON_ERROR =
ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "recreate_table_on_error")
+ .defaultValue(false)
+ .sinceVersion("0.14.2")
+ .markAdvanced()
+ .withDocumentation("Glue sync may fail if the Glue table exists with
partitions differing from the Hoodie table or if schema evolution is not
supported by Glue."
+ + "Enabling this configuration will drop and create the table to
match the Hoodie config");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index f415488775b5..7fd41e437e0a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -43,6 +43,7 @@ import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.storage.inline.InLineFSUtils;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -747,6 +748,17 @@ public class FSUtils {
return pathInfoList;
}
+ public static boolean comparePathsWithoutScheme(String pathStr1, String
pathStr2) {
+ Path pathWithoutScheme1 = getPathWithoutScheme(new Path(pathStr1));
+ Path pathWithoutScheme2 = getPathWithoutScheme(new Path(pathStr2));
+ return pathWithoutScheme1.equals(pathWithoutScheme2);
+ }
+
+ public static Path getPathWithoutScheme(Path path) {
+ return path.isUriPathAbsolute()
+ ? new Path(null, path.toUri().getAuthority(), path.toUri().getPath())
: path;
+ }
+
// Converts s3a to s3a
public static String s3aToS3(String s3aUrl) {
return s3aUrl.replaceFirst("(?i)^s3a://", "s3://");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 37915c826c10..8d07aa85eb42 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -114,6 +114,24 @@ public final class SchemaTestUtil {
}
}
+ private static <T extends IndexedRecord> List<T> toRecords(Schema
writerSchema, Schema readerSchema, String path)
+ throws IOException, URISyntaxException {
+ GenericDatumReader<T> reader = new GenericDatumReader<>(writerSchema,
readerSchema);
+ Path dataPath = initializeSampleDataPath(path);
+
+ try (Stream<String> stream = Files.lines(dataPath)) {
+ return stream.map(s -> {
+ try {
+ return reader.read(null,
DecoderFactory.get().jsonDecoder(writerSchema, s));
+ } catch (IOException e) {
+ throw new HoodieIOException("Could not read data from " + path, e);
+ }
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HoodieIOException("Could not read data from " + path, e);
+ }
+ }
+
/**
* Required to register the necessary JAR:// file system.
* @return Path to the sample data in the resource file.
@@ -129,6 +147,15 @@ public final class SchemaTestUtil {
}
}
+ private static Path initializeSampleDataPath(String path) throws
IOException, URISyntaxException {
+ URI resource = SchemaTestUtil.class.getResource(path).toURI();
+ if (resource.toString().contains("!")) {
+ return uriToPath(resource);
+ } else {
+ return Paths.get(SchemaTestUtil.class.getResource(path).toURI());
+ }
+ }
+
public static Path uriToPath(URI uri) throws IOException {
final Map<String, String> env = new HashMap<>();
final String[] array = uri.toString().split("!");
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 76a24ad8c03a..6745f2c76f62 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -26,7 +26,6 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -34,7 +33,6 @@ import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hadoop.conf.Configuration;
@@ -48,7 +46,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
-import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.file.Files;
@@ -57,7 +54,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
@@ -564,22 +560,42 @@ public class TestFSUtils extends HoodieCommonTestHarness {
}
@Test
- public void testGetPathInfoUnderPartition() throws IOException {
- StoragePath hoodieTempDir = getHoodieTempDir();
- HoodieStorage storage = metaClient.getStorage();
- prepareTestDirectory(storage, hoodieTempDir);
- List<Option<StoragePathInfo>> fileStatusList =
FSUtils.getPathInfoUnderPartition(
- storage,
- new StoragePath(baseUri.toString(), ".hoodie/.temp"),
- new HashSet<>(Collections.singletonList("file3.txt")),
- false);
- assertEquals(1, fileStatusList.size());
+ void testComparePathsWithoutScheme() {
+ String path1 = "s3://test_bucket_one/table/base/path";
+ String path2 = "s3a://test_bucket_two/table/base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return false since bucket names dont match");
+
+ path1 = "s3a://test_bucket_one/table/new_base/path";
+ path2 = "s3a://test_bucket_one/table/old_base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return false since paths don't match");
+
+ path1 = "s3://test_bucket_one/table/base/path";
+ path2 = "s3a://test_bucket_one/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
false since bucket names match without file shema");
+
+ path1 = "s3a://test_bucket_one/table/base/path";
+ path2 = "s3a://test_bucket_one/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since bucket names and path matches");
+
+ path1 = "gs://test_bucket_one/table/base/path";
+ path2 = "gs://test_bucket_two/table/base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return true since bucket names and path matches");
- assertThrows(HoodieIOException.class, () ->
FSUtils.getPathInfoUnderPartition(
- storage,
- new StoragePath(baseUri.toString(), ".hoodie/.temp"),
- new HashSet<>(Collections.singletonList("file4.txt")),
- false));
+ path1 = "gs://test_bucket_one/table/base/path";
+ path2 = "gs://test_bucket_one/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since bucket names and path matches");
+
+ path1 = "file:/var/table/base/path";
+ path2 = "/var/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since path matches");
+
+ path1 = "file:/var/table/base/path";
+ path2 = "file:/var/table/old_base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return false since path doesn't matches");
+
+ path1 = "table/base/path";
+ path2 = "table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since relative path doesn't matches");
}
@Test
@@ -594,23 +610,16 @@ public class TestFSUtils extends HoodieCommonTestHarness {
assertEquals("s3://my-bucket/s3a://another-bucket/another/path",
FSUtils.s3aToS3("s3a://my-bucket/s3a://another-bucket/another/path"));
}
- @ParameterizedTest
- @ValueSource(strings = {
- "gs://my-bucket/path/to/object",
- "gs://my-bucket",
- "gs://MY-BUCKET/PATH/TO/OBJECT",
- "https://myaccount.blob.core.windows.net/mycontainer/path/to/blob",
- "https://myaccount.blob.core.windows.net/MYCONTAINER/PATH/TO/BLOB",
- "https://example.com/path/to/resource",
- "http://example.com",
- "ftp://example.com/resource",
- "",
- "gs://my-bucket/path/to/s3a://object",
- "gs://my-bucket s3a://my-object",
- })
-
- void testUriDoesNotChange(String uri) {
- assertEquals(uri, FSUtils.s3aToS3(uri));
+ @Test
+ void testGetPathWithoutScheme() {
+ String path1 = "s3://test_bucket_one/table/base/path";
+ assertEquals(FSUtils.getPathWithoutScheme(new
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should
return false since bucket names dont match");
+
+ path1 = "s3a://test_bucket_one/table/base/path";
+ assertEquals(FSUtils.getPathWithoutScheme(new
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should
return false since bucket names dont match");
+
+ path1 = "gs://test_bucket_one/table/base/path";
+ assertEquals(FSUtils.getPathWithoutScheme(new
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should
return false since bucket names dont match");
}
private StoragePath getHoodieTempDir() {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 331c8906bc55..9187349f38c6 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -88,6 +88,14 @@ public class HiveSyncConfig extends HoodieSyncConfig {
+ "filters exceed this size, will directly try to fetch all
partitions between the min/max."
+ "In case of glue metastore, this value should be reduced because
it has a filter length limit.");
+ public static final ConfigProperty<Boolean> RECREATE_HIVE_TABLE_ON_ERROR =
ConfigProperty
+ .key("hoodie.datasource.hive_sync.recreate_table_on_error")
+ .defaultValue(false)
+ .sinceVersion("0.14.2")
+ .markAdvanced()
+ .withDocumentation("Hive sync may fail if the Hive table exists with
partitions differing from the Hoodie table or if schema evolution if not
supported by Hive."
+ + "Enabling this configuration will drop and create the table to
match the Hoodie config");
+
public static String getBucketSpec(String bucketCols, int bucketNum) {
return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 9a42a8effd53..af969cce0757 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.util.ConfigUtils;
@@ -49,6 +50,7 @@ import static
org.apache.hudi.common.util.StringUtils.nonEmpty;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
+import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
import static
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
@@ -89,7 +91,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
- private HiveSyncConfig config;
+ protected HiveSyncConfig config;
private final String databaseName;
private final String tableName;
@@ -180,11 +182,11 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
break;
case MERGE_ON_READ:
switch (HoodieSyncTableStrategy.valueOf(hiveSyncTableStrategy)) {
- case RO :
+ case RO:
// sync a RO table for MOR
syncHoodieTable(tableName, false, true);
break;
- case RT :
+ case RT:
// sync a RT table for MOR
syncHoodieTable(tableName, true, false);
break;
@@ -223,6 +225,48 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " +
syncClient.getBasePath()
+ " of type " + syncClient.getTableType());
+ // create database if needed
+ checkAndCreateDatabase();
+
+ final boolean tableExists = syncClient.tableExists(tableName);
+ // Get the parquet schema for this table looking at the latest commit
+ MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+ // if table exists and location of the metastore table doesn't match the
hoodie base path, recreate the table
+ if (tableExists &&
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(),
syncClient.getTableLocation(tableName))) {
+ LOG.info("basepath is updated for the table {}", tableName);
+ recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
+ return;
+ }
+
+ boolean schemaChanged;
+ boolean propertiesChanged;
+ try {
+ if (tableExists) {
+ schemaChanged = syncSchema(tableName, schema);
+ propertiesChanged = syncProperties(tableName, useRealtimeInputFormat,
readAsOptimized, schema);
+ } else {
+ syncFirstTime(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
+ schemaChanged = true;
+ propertiesChanged = true;
+ }
+
+ boolean partitionsChanged = validateAndSyncPartitions(tableName,
tableExists);
+ boolean meetSyncConditions = schemaChanged || propertiesChanged ||
partitionsChanged;
+ if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) ||
meetSyncConditions) {
+ syncClient.updateLastCommitTimeSynced(tableName);
+ }
+ LOG.info("Sync complete for {}", tableName);
+ } catch (HoodieHiveSyncException ex) {
+ if (shouldRecreateAndSyncTable()) {
+ LOG.warn("failed to sync the table {}, trying to recreate", tableName,
ex);
+ recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
+ } else {
+ throw new HoodieHiveSyncException("failed to sync the table " +
tableName, ex);
+ }
+ }
+ }
+
+ private void checkAndCreateDatabase() {
// check if the database exists else create it
if (config.getBoolean(HIVE_AUTO_CREATE_DATABASE)) {
try {
@@ -239,22 +283,9 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
throw new HoodieHiveSyncException("hive database does not exist " +
databaseName);
}
}
+ }
- final boolean tableExists = syncClient.tableExists(tableName);
-
- // Get the parquet schema for this table looking at the latest commit
- MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
- boolean schemaChanged;
- boolean propertiesChanged;
- if (tableExists) {
- schemaChanged = syncSchema(tableName, schema);
- propertiesChanged = syncProperties(tableName, useRealtimeInputFormat,
readAsOptimized, schema);
- } else {
- syncFirstTime(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
- schemaChanged = true;
- propertiesChanged = true;
- }
-
+ private boolean validateAndSyncPartitions(String tableName, boolean
tableExists) {
boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
@@ -290,12 +321,34 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
LOG.info("Partitions dropped since last sync: {}",
droppedPartitions.size());
partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
}
+ return partitionsChanged;
+ }
+
+ protected boolean shouldRecreateAndSyncTable() {
+ return config.getBooleanOrDefault(RECREATE_HIVE_TABLE_ON_ERROR);
+ }
- boolean meetSyncConditions = schemaChanged || propertiesChanged ||
partitionsChanged;
- if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
+ private void recreateAndSyncHiveTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) {
+ LOG.info("recreating and syncing the table {}", tableName);
+ MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+ try {
+ createOrReplaceTable(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
+ syncAllPartitions(tableName);
syncClient.updateLastCommitTimeSynced(tableName);
+ } catch (HoodieHiveSyncException ex) {
+ throw new HoodieHiveSyncException("failed to recreate the table for " +
tableName, ex);
}
- LOG.info("Sync complete for " + tableName);
+ }
+
+ private void createOrReplaceTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized, MessageType schema) {
+ HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+ String inputFormatClassName = getInputFormatClassName(baseFileFormat,
useRealtimeInputFormat,
config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT));
+ String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
+ String serDeFormatClassName = getSerDeClassName(baseFileFormat);
+ Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized);
+ Map<String, String> tableProperties = getTableProperties(schema);
+ syncClient.createOrReplaceTable(tableName, schema, inputFormatClassName,
+ outputFormatClassName, serDeFormatClassName, serdeProperties,
tableProperties);
}
private Map<String, String> getTableProperties(MessageType schema) {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 38b4e30942ef..6c233e93fff2 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -246,6 +246,35 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
return new
PartitionFilterGenerator().generatePushDownFilter(writtenPartitions,
partitionFields, config);
}
+ public void createOrReplaceTable(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String> tableProperties) {
+
+ if (!tableExists(tableName)) {
+ createTable(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ return;
+ }
+ try {
+ // create temp table
+ String tempTableName = generateTempTableName(tableName);
+ createTable(tempTableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+
+ // if create table is successful, drop the actual table
+ // and rename temp table to actual table
+ dropTable(tableName);
+
+ Table table = client.getTable(databaseName, tempTableName);
+ table.setTableName(tableName);
+ client.alter_table(databaseName, tempTableName, table);
+ } catch (Exception ex) {
+ throw new HoodieHiveSyncException("failed to create table " +
tableId(databaseName, tableName), ex);
+ }
+ }
+
@Override
public void createTable(String tableName, MessageType storageSchema, String
inputFormatClass,
String outputFormatClass, String serdeClass,
@@ -447,4 +476,24 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
StorageDescriptor getMetastoreStorageDescriptor(String tableName) throws
TException {
return client.getTable(databaseName, tableName).getSd();
}
+
+ @Override
+ public void dropTable(String tableName) {
+ try {
+ client.dropTable(databaseName, tableName);
+ LOG.info("Successfully deleted table in Hive: {}.{}", databaseName,
tableName);
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to delete the table " +
tableId(databaseName, tableName), e);
+ }
+ }
+
+ @Override
+ public String getTableLocation(String tableName) {
+ try {
+ Table table = client.getTable(databaseName, tableName);
+ return table.getSd().getLocation();
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get the basepath of the
table " + tableId(databaseName, tableName), e);
+ }
+ }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index e696bdd9e93a..680aecff8de5 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -87,6 +88,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
import static
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
+import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
@@ -245,7 +247,7 @@ public class TestHiveSyncTool {
throw new HoodieHiveSyncException("Failed to get the metastore location
from the table " + tableName, e);
}
}
-
+
@ParameterizedTest
@MethodSource("syncMode")
public void testSyncAllPartition() throws Exception {
@@ -288,7 +290,7 @@ public class TestHiveSyncTool {
"Table partitions should match the number of partitions we wrote");
// Drop partition with HMSDDLExecutor
try (HMSDDLExecutor hmsExecutor =
- new HMSDDLExecutor(hiveSyncConfig,
IMetaStoreClientUtil.getMSC(hiveSyncConfig.getHiveConf()))) {
+ new HMSDDLExecutor(hiveSyncConfig,
IMetaStoreClientUtil.getMSC(hiveSyncConfig.getHiveConf()))) {
hmsExecutor.dropPartitionsToTable(HiveTestUtil.TABLE_NAME,
Collections.singletonList("2010/02/03"));
}
@@ -1033,34 +1035,34 @@ public class TestHiveSyncTool {
String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(roTableName),
- "Table " + roTableName + " should not exist initially");
+ "Table " + roTableName + " should not exist initially");
assertFalse(hiveClient.tableExists(snapshotTableName),
- "Table " + snapshotTableName + " should not exist initially");
+ "Table " + snapshotTableName + " should not exist initially");
reSyncHiveTable();
switch (strategy) {
case RO:
assertFalse(hiveClient.tableExists(snapshotTableName),
- "Table " + snapshotTableName
- + " should not exist initially");
+ "Table " + snapshotTableName
+ + " should not exist initially");
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME
- + " should exist after sync completes");
+ "Table " + HiveTestUtil.TABLE_NAME
+ + " should exist after sync completes");
break;
case RT:
assertFalse(hiveClient.tableExists(roTableName),
- "Table " + roTableName
- + " should not exist initially");
+ "Table " + roTableName
+ + " should not exist initially");
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME
- + " should exist after sync completes");
+ "Table " + HiveTestUtil.TABLE_NAME
+ + " should exist after sync completes");
break;
default:
assertTrue(hiveClient.tableExists(roTableName),
- "Table " + roTableName
- + " should exist after sync completes");
+ "Table " + roTableName
+ + " should exist after sync completes");
assertTrue(hiveClient.tableExists(snapshotTableName),
- "Table " + snapshotTableName
- + " should exist after sync completes");
+ "Table " + snapshotTableName
+ + " should exist after sync completes");
}
}
@@ -1077,11 +1079,11 @@ public class TestHiveSyncTool {
MessageType schema = hiveClient.getStorageSchema(true);
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME + " should not exist
initially");
+ "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
String initInputFormatClassName =
strategy.equals(HoodieSyncTableStrategy.RO)
- ? HoodieParquetRealtimeInputFormat.class.getName()
- : HoodieParquetInputFormat.class.getName();
+ ? HoodieParquetRealtimeInputFormat.class.getName()
+ : HoodieParquetInputFormat.class.getName();
String outputFormatClassName =
HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET);
String serDeFormatClassName =
HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET);
@@ -1089,28 +1091,28 @@ public class TestHiveSyncTool {
// Create table 'test1'.
hiveClient.createDatabase(HiveTestUtil.DB_NAME);
hiveClient.createTable(HiveTestUtil.TABLE_NAME, schema,
initInputFormatClassName,
- outputFormatClassName, serDeFormatClassName, new HashMap<>(), new
HashMap<>());
+ outputFormatClassName, serDeFormatClassName, new HashMap<>(), new
HashMap<>());
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME + " should exist initially");
+ "Table " + HiveTestUtil.TABLE_NAME + " should exist initially");
String targetInputFormatClassName =
strategy.equals(HoodieSyncTableStrategy.RO)
- ? HoodieParquetInputFormat.class.getName()
- : HoodieParquetRealtimeInputFormat.class.getName();
+ ? HoodieParquetInputFormat.class.getName()
+ : HoodieParquetRealtimeInputFormat.class.getName();
StorageDescriptor storageDescriptor =
hiveClient.getMetastoreStorageDescriptor(HiveTestUtil.TABLE_NAME);
assertEquals(initInputFormatClassName, storageDescriptor.getInputFormat(),
- "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
+ "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
assertFalse(storageDescriptor.getSerdeInfo().getParameters().containsKey(ConfigUtils.IS_QUERY_AS_RO_TABLE),
- "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should not exist");
+ "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should not exist");
reSyncHiveTable();
storageDescriptor =
hiveClient.getMetastoreStorageDescriptor(HiveTestUtil.TABLE_NAME);
assertEquals(targetInputFormatClassName,
- storageDescriptor.getInputFormat(),
- "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
+ storageDescriptor.getInputFormat(),
+ "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
assertEquals(storageDescriptor.getSerdeInfo().getParameters().get(ConfigUtils.IS_QUERY_AS_RO_TABLE),
- strategy.equals(HoodieSyncTableStrategy.RO) ? "true" : "false",
- "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should be ");
+ strategy.equals(HoodieSyncTableStrategy.RO) ? "true" : "false",
+ "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should be ");
}
@@ -1776,4 +1778,206 @@ public class TestHiveSyncTool {
private int getPartitionFieldSize() {
return
hiveSyncProps.getString(META_SYNC_PARTITION_FIELDS.key()).split(",").length;
}
+
+ @ParameterizedTest
+ @MethodSource("syncMode")
+ public void testSyncHoodieTableCatchRuntimeException(String syncMode) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
+
+ // Create a tool instance to initialize the sync client
+ HiveSyncTool tool = new HiveSyncTool(hiveSyncProps, getHiveConf());
+ try {
+ // Corrupt the table by deleting the commit file to cause a
RuntimeException during sync
+ // This will cause doSync() to fail when it tries to read the storage
schema
+ Path metadataPath = new Path(HiveTestUtil.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME);
+ Path instantPath = new Path(metadataPath, commitTime + ".commit");
+ if (HiveTestUtil.fileSystem.exists(instantPath)) {
+ HiveTestUtil.fileSystem.delete(instantPath, false);
+ }
+
+ // Verify that syncHoodieTable catches RuntimeException and wraps it in
HoodieException
+ HoodieException exception = assertThrows(HoodieException.class, () -> {
+ tool.syncHoodieTable();
+ }, "syncHoodieTable should throw HoodieException when RuntimeException
occurs");
+
+ // Verify the exception message contains the table name
+ assertTrue(exception.getMessage().contains(HiveTestUtil.TABLE_NAME),
+ "Exception message should contain table name: " +
HiveTestUtil.TABLE_NAME);
+ assertTrue(exception.getMessage().contains("Got runtime exception when
hive syncing"),
+ "Exception message should contain expected error message");
+
+ // Verify that the cause is a RuntimeException
+ assertTrue(exception.getCause() instanceof RuntimeException,
+ "The cause should be a RuntimeException");
+ } finally {
+ // Ensure tool is closed even if exception is thrown
+ tool.close();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncMode")
+ public void testRecreateAndSyncHiveTableOnError(String syncMode) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(RECREATE_HIVE_TABLE_ON_ERROR.key(), "true");
+
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
+
+ // First sync should succeed
+ reSyncHiveTable();
+ assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should exist after first sync");
+ assertEquals(commitTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be updated in the
TBLPROPERTIES");
+
+ // Verify the Hoodie table structure is intact
+ Path hoodieMetaPath = new Path(HiveTestUtil.basePath,
HoodieTableMetaClient.METAFOLDER_NAME);
+ assertTrue(HiveTestUtil.fileSystem.exists(hoodieMetaPath),
+ "Hoodie metadata folder should exist at " + hoodieMetaPath);
+
+ // Corrupt the Hive table by dropping it and creating it with incompatible
serde/input format
+ // This will cause syncProperties to potentially fail when trying to
update to Parquet formats
+ ddlExecutor.runSQL("DROP TABLE IF EXISTS `" + HiveTestUtil.TABLE_NAME +
"`");
+
+ // Create table with correct schema but wrong input/output format
+ // The location must point to the correct basePath to keep Hoodie metadata
accessible
+ String createTableSql = String.format(
+ "CREATE TABLE `%s` (`name` STRING, `favorite_number` INT,
`favorite_color` STRING) "
+ + "PARTITIONED BY (`datestr` STRING) "
+ + "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' "
+ + "OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' "
+ + "LOCATION '%s'",
+ HiveTestUtil.TABLE_NAME, HiveTestUtil.basePath);
+ ddlExecutor.runSQL(createTableSql);
+
+ // Now sync should attempt to update the serde properties to use Parquet
formats
+ // If this fails with HoodieHiveSyncException, recreateAndSyncHiveTable
should be called
+ // which will drop and recreate the table with correct formats
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ // Verify the table was recreated and synced successfully
+ assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should exist after recreation");
+
+ // Verify the schema is correct
+ Map<String, String> schema =
hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME);
+ assertTrue(schema.containsKey("name"), "Schema should contain 'name'
field");
+ assertEquals("string", schema.get("name").toLowerCase(),
+ "Schema should have correct type for 'name' field after recreation");
+ // Verify the last commit time is synced, indicating successful recreation
and sync
+ assertEquals(commitTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be updated after recreation");
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ public void testRecreateCOWTableOnBasePathChange(String syncMode, String
enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ String commitTime1 = "100";
+ HiveTestUtil.createCOWTable(commitTime1, 5, true);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ String commitTime2 = "105";
+ // let's update the basepath
+ basePath = Files.createTempDirectory("hivesynctest_new" +
Instant.now().toEpochMilli()).toUri().toString();
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+ // let's create new table in new basepath
+ HiveTestUtil.createCOWTable(commitTime2, 2, true);
+ // Now lets create more partitions and these are the only ones which needs
to be synced
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+ String commitTime3 = "110";
+ // let's add 2 more partitions to the new basepath
+ HiveTestUtil.addCOWPartitions(2, false, true, dateTime, commitTime3);
+
+ // reinitialize hive client
+ reInitHiveSyncClient();
+ // after reinitializing hive client, table location shouldn't match hoodie
base path
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location
should match hoodie basepath");
+
+ // Lets do the sync
+ reSyncHiveTable();
+ // verify partition count should be 4 from new basepath, not 5 from old
+ assertEquals(4,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
+ "the 4 partitions from new base path should be present for hive");
+ // verify last commit time synced
+ assertEquals(commitTime3,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be 110");
+ // table location now should be updated to latest hoodie basepath
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location
should match hoodie basepath");
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncModeAndSchemaFromCommitMetadata")
+ public void testSyncMergeOnReadWithBasePathChange(boolean
useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ String instantTime = "100";
+ String deltaCommitTime = "101";
+ HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+ useSchemaFromCommitMetadata);
+
+ String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
+ String rtTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(roTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
+ assertFalse(hiveClient.tableExists(rtTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
+ // Lets do the sync
+ reSyncHiveTable();
+
+ // change the hoodie base path
+ basePath = Files.createTempDirectory("hivesynctest_new" +
Instant.now().toEpochMilli()).toUri().toString();
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+ String instantTime2 = "102";
+ String deltaCommitTime2 = "103";
+ // let's create MOR table in the new basepath
+ HiveTestUtil.createMORTable(instantTime2, deltaCommitTime2, 2, true,
+ useSchemaFromCommitMetadata);
+
+ // let's add more partitions in the new basepath
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+ String commitTime3 = "104";
+ String deltaCommitTime3 = "105";
+ HiveTestUtil.addMORPartitions(2, true, false,
+ useSchemaFromCommitMetadata, dateTime, commitTime3, deltaCommitTime3);
+
+ // reinitialize hive client
+ reInitHiveSyncClient();
+ // verify table location is different from hoodie basepath
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(roTableName), "ro table location should not match
hoodie base path before sync");
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(rtTableName), "rt table location should not match
hoodie base path before sync");
+ // Lets do the sync
+ reSyncHiveTable();
+
+ // verify partition count should be 4, not 5 from old basepath
+ assertEquals(4, hiveClient.getAllPartitions(roTableName).size(),
+ "the 4 partitions from new base path should be present for ro table");
+ assertEquals(4, hiveClient.getAllPartitions(rtTableName).size(),
+ "the 4 partitions from new base path should be present for rt table");
+ // verify last synced commit time
+ assertEquals(deltaCommitTime3,
hiveClient.getLastCommitTimeSynced(roTableName).get(),
+ "The last commit that was synced should be 103");
+ assertEquals(deltaCommitTime3,
hiveClient.getLastCommitTimeSynced(rtTableName).get(),
+ "The last commit that was synced should be 103");
+ // verify table location is updated to the new hoodie basepath
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(roTableName), "ro table location should match
hoodie base path after sync");
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(rtTableName), "rt table location should match
hoodie base path after sync");
+ }
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index ca0bec3604bd..f0772f2b548b 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
@@ -56,6 +57,27 @@ public interface HoodieMetaSyncOperations {
}
+ /**
+ * Create or replace the table.
+ *
+ * @param tableName The table name.
+ * @param storageSchema The table schema.
+ * @param inputFormatClass The input format class of this table.
+ * @param outputFormatClass The output format class of this table.
+ * @param serdeClass The serde class of this table.
+ * @param serdeProperties The serde properties of this table.
+ * @param tableProperties The table properties for this table.
+ */
+ default void createOrReplaceTable(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String> tableProperties) {
+
+ }
+
/**
* Check if table exists in metastore.
*/
@@ -163,6 +185,13 @@ public interface HoodieMetaSyncOperations {
return Collections.emptyList();
}
+ /**
+ * Get the base path of the table from metastore
+ */
+ default String getTableLocation(String tableName) {
+ return StringUtils.EMPTY_STRING;
+ }
+
/**
* Update the field comments for table in metastore, by using the ones from
storage.
*
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index b96f2ad76121..13e3bde67a71 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -57,6 +58,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
protected final HoodieSyncConfig config;
protected final PartitionValueExtractor partitionValueExtractor;
protected final HoodieTableMetaClient metaClient;
+ private static final String TEMP_SUFFIX = "_temp";
public HoodieSyncClient(HoodieSyncConfig config) {
this.config = config;
@@ -248,4 +250,8 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
}
return paths;
}
+
+ protected String generateTempTableName(String tableName) {
+ return tableName + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond();
+ }
}