This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.14.2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.14.2-prep by this
push:
new 6ff2eb94af76 fix: cherry-picking for 0.14.2 release (#17924)
6ff2eb94af76 is described below
commit 6ff2eb94af76ab7c681f6a9209958a8288214289
Author: Lin Liu <[email protected]>
AuthorDate: Thu Mar 5 01:50:38 2026 -0800
fix: cherry-picking for 0.14.2 release (#17924)
* [HUDI-9088] Fix unnecessary scanning of target table in MERGE INTO on
Spark (#12934)
* [HUDI-8212] Add extra config of billing project ID for BigQuery sync
(#11988)
Co-authored-by: Y Ethan Guo <[email protected]>
* [HUDI-9288] Fixing HoodieFileGroup api related to uncommitted slices
(#13125)
* [HUDI-6868] Support extracting passwords from credential store for Hive
Sync (#10577)
Co-authored-by: Danny Chan <[email protected]>
* [MINOR] [BRANCH-0.x] Added condition to check default value to fix
extracting password from credential store (#11247)
* [HUDI-9039] Run do init table transaction only when required (#12847)
Co-authored-by: Leon Lin <[email protected]>
* [HUDI-9681] Remove mkdir in partition listing and add try catch to
listStatus of partition (#13739)
* [HUDI-9770] During hive/glue sync, ensure drop partition events are
generated when partition is present in the metastore (#13794)
* [HUDI-7478] Fix max delta commits guard check w/ MDT (#10820)
Co-authored-by: Vova Kolmakov <[email protected]>
* [HUDI-8161] Make spark-sql command 'desc' independent from schema
evolution config (#11871)
Co-authored-by: Vova Kolmakov <[email protected]>
* Fix CI failures
* Add some preliminary change
* Fix a CI test
* Fix validation bundle failures
* Try update docker_java17_test.sh
---------
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Aditya Goenka
<[email protected]>
Co-authored-by: Tim Brown <[email protected]>
Co-authored-by: Danny Chan <[email protected]>
Co-authored-by: Leon Lin <[email protected]>
Co-authored-by: Leon Lin <[email protected]>
Co-authored-by: Roushan Kumar <[email protected]>
Co-authored-by: wombatu-kun <[email protected]>
Co-authored-by: Vova Kolmakov <[email protected]>
Co-authored-by: Vova Kolmakov <[email protected]>
---
.github/workflows/bot.yml | 11 ++
.../apache/hudi/client/BaseHoodieWriteClient.java | 22 ++-
.../metadata/HoodieBackedTableMetadataWriter.java | 4 +-
.../functional/TestHoodieBackedMetadata.java | 37 +++++
.../apache/hudi/common/model/HoodieFileGroup.java | 2 +-
.../table/view/AbstractTableFileSystemView.java | 22 +--
.../metadata/FileSystemBackedTableMetadata.java | 9 +-
.../hudi/common/model/TestHoodieFileGroup.java | 1 +
.../hudi/gcp/bigquery/BigQuerySyncConfig.java | 12 ++
.../gcp/bigquery/HoodieBigQuerySyncClient.java | 7 +-
.../hudi/gcp/bigquery/TestBigQuerySyncConfig.java | 3 +
.../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 33 ++++-
hudi-integ-test/pom.xml | 10 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 2 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 16 ++-
.../hudi/command/MergeIntoHoodieTableCommand.scala | 51 +++----
.../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 51 ++++++-
.../apache/spark/sql/hudi/TestDescribeTable.scala | 117 ++++++++++++++++
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 150 +++++++++++++--------
.../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 9 +-
.../java/org/apache/hudi/hive/HiveSyncTool.java | 1 +
.../org/apache/hudi/hive/TestHiveSyncTool.java | 123 +++++++++++++++++
.../apache/hudi/sync/common/HoodieSyncClient.java | 13 +-
packaging/bundle-validation/ci_run.sh | 6 +
packaging/bundle-validation/validate.sh | 26 ++--
25 files changed, 619 insertions(+), 119 deletions(-)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index fd3cc67976a1..432d35981728 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -421,6 +421,17 @@ jobs:
java-version: '8'
distribution: 'adopt'
architecture: x64
+ - name: Check disk space
+ run: df -h
+ - name: 'Free space'
+ run: |
+ sudo rm -rf /usr/share/dotnet
+ sudo rm -rf /usr/local/lib/android
+ sudo rm -rf /opt/ghc
+ sudo rm -rf /usr/local/share/boost
+ docker system prune --all --force --volumes
+ - name: Check disk space after cleanup
+ run: df -h
- name: Build Project
env:
SPARK_PROFILE: ${{ matrix.sparkProfile }}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d847468e4145..bfb811a15990 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -1258,15 +1259,32 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true,
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()),
instantTime.get()));
}
- this.txnManager.beginTransaction(ownerInstant, Option.empty());
- try {
+
+ boolean requiresInitTable = needsUpgradeOrDowngrade(metaClient) ||
config.isMetadataTableEnabled();
+ if (!requiresInitTable) {
+ return;
+ }
+ executeUsingTxnManager(ownerInstant, () -> {
tryUpgrade(metaClient, instantTime);
+ // TODO: this also does MT table management..
initMetadataTable(instantTime);
+ });
+ }
+
+ private void executeUsingTxnManager(Option<HoodieInstant> ownerInstant,
Runnable r) {
+ this.txnManager.beginTransaction(ownerInstant, Option.empty());
+ try {
+ r.run();
} finally {
this.txnManager.endTransaction(ownerInstant);
}
}
+ private boolean needsUpgradeOrDowngrade(HoodieTableMetaClient metaClient) {
+ UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(metaClient,
config, context, upgradeDowngradeHelper);
+ return
upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current());
+ }
+
/**
* Bootstrap the metadata table.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4687bf47c2c9..15838520edcc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -97,7 +97,7 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
@@ -762,7 +762,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
protected static void checkNumDeltaCommits(HoodieTableMetaClient metaClient,
int maxNumDeltaCommitsWhenPending) {
final HoodieActiveTimeline activeTimeline =
metaClient.reloadActiveTimeline();
Option<HoodieInstant> lastCompaction =
activeTimeline.filterCompletedInstants()
- .filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant();
+ .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
int numDeltaCommits = lastCompaction.isPresent()
?
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants()
: activeTimeline.getDeltaCommitTimeline().countInstants();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index ffeade2af57c..739bc5df9361 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -165,6 +165,7 @@ import static
org.apache.hudi.common.model.WriteOperationType.DELETE;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_EXTENSION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_EXTENSION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INFLIGHT_EXTENSION;
@@ -2886,6 +2887,42 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertTrue(t.getMessage().startsWith(String.format("Metadata table's
deltacommits exceeded %d: ", maxNumDeltacommits)));
}
+ @Test
+ public void testMORCheckNumDeltaCommits() throws Exception {
+ init(MERGE_ON_READ, true);
+ final int maxNumDeltaCommits = 3;
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .enableMetrics(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommits
- 1)
+ .withMaxNumDeltacommitsWhenPending(maxNumDeltaCommits)
+ .build())
+ .build();
+ initWriteConfigAndMetatableWriter(writeConfig, true);
+ // write deltacommits to data-table and do compaction in metadata-table
(with commit-instant)
+ doWriteOperation(testTable, HoodieActiveTimeline.createNewInstantTime(1));
+ doWriteOperation(testTable, HoodieActiveTimeline.createNewInstantTime(1));
+ // ensure the compaction is triggered and executed
+ try (HoodieBackedTableMetadata metadata = new
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), true)) {
+ HoodieTableMetaClient metadataMetaClient =
metadata.getMetadataMetaClient();
+ final HoodieActiveTimeline activeTimeline =
metadataMetaClient.reloadActiveTimeline();
+ Option<HoodieInstant> lastCompaction =
activeTimeline.filterCompletedInstants()
+ .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+ assertTrue(lastCompaction.isPresent());
+ // create pending instant in data table
+
testTable.addRequestedCommit(HoodieActiveTimeline.createNewInstantTime(1));
+ // continue writing
+ for (int i = 0; i < maxNumDeltaCommits; i++) {
+ doWriteOperation(testTable,
HoodieActiveTimeline.createNewInstantTime(1));
+ }
+ Throwable t = assertThrows(HoodieMetadataException.class, () ->
doWriteOperation(testTable, HoodieActiveTimeline.createNewInstantTime(1)));
+ assertTrue(t.getMessage().startsWith(String.format("Metadata table's
deltacommits exceeded %d: ", maxNumDeltaCommits)));
+ assertEquals(maxNumDeltaCommits + 1,
+
activeTimeline.reload().getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants());
+ }
+ }
+
@Test
public void testNonPartitioned() throws Exception {
init(HoodieTableType.COPY_ON_WRITE, false);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index 9b5e8c1dd6f0..1c042d051aa9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -154,7 +154,7 @@ public class HoodieFileGroup implements Serializable {
}
public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
- return fileSlices.values().stream().filter(slice ->
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS,
maxInstantTime));
+ return getAllFileSlices().filter(slice ->
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS,
maxInstantTime));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index c6e524e8dd78..d051b33b74b5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -72,6 +72,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* Common thread-safe implementation for multiple TableFileSystemView
Implementations.
@@ -254,7 +255,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
// Duplicate key error when insert_overwrite same partition in multi
writer, keep the instant with greater timestamp when the file group id conflicts
Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups =
resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
- (instance1, instance2) ->
HoodieTimeline.compareTimestamps(instance1.getTimestamp(),
HoodieTimeline.LESSER_THAN, instance2.getTimestamp()) ? instance2 : instance1));
+ (instance1, instance2) -> compareTimestamps(instance1.getTimestamp(),
HoodieTimeline.LESSER_THAN, instance2.getTimestamp()) ? instance2 : instance1));
resetReplacedFileGroups(replacedFileGroups);
LOG.info("Took " + hoodieTimer.endTimer() + " ms to read " +
replacedTimeline.countInstants() + " instants, "
+ replacedFileGroups.size() + " replaced file groups");
@@ -397,9 +398,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
try {
fileStatusMap.put(partitionPair,
metaClient.getFs().listStatus(absolutePartitionPath));
} catch (IOException e) {
- // Create the path if it does not exist already
if (!metaClient.getFs().exists(absolutePartitionPath)) {
- metaClient.getFs().mkdirs(absolutePartitionPath);
fileStatusMap.put(partitionPair, new FileStatus[0]);
} else {
// in case the partition path was created by another caller
@@ -466,9 +465,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
try {
return metaClient.getFs().listStatus(partitionPath);
} catch (IOException e) {
- // Create the path if it does not exist already
if (!metaClient.getFs().exists(partitionPath)) {
- metaClient.getFs().mkdirs(partitionPath);
return new FileStatus[0];
} else {
// in case the partition path was created by another caller
@@ -702,7 +699,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return fetchAllStoredFileGroups(partitionPath)
.filter(fileGroup ->
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
.map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
- .filter(baseFile ->
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
+ .filter(baseFile -> compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
))
.filter(df -> !isBaseFileDueToPendingCompaction(df) &&
!isBaseFileDueToPendingClustering(df)).findFirst()))
.filter(Option::isPresent).map(Option::get)
@@ -719,7 +716,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return Option.empty();
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup ->
fileGroup.getAllBaseFiles()
- .filter(baseFile ->
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.EQUALS,
+ .filter(baseFile ->
compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df ->
!isBaseFileDueToPendingCompaction(df) &&
!isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new
HoodieFileGroupId(partitionPath, fileId), df));
}
@@ -1444,7 +1441,10 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
* @param maxInstantTime The max instant time
*/
private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup
fileGroup, String maxInstantTime) {
- List<FileSlice> fileSlices =
fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList());
+ List<FileSlice> fileSlices =
fileGroup.getAllRawFileSlices().collect(Collectors.toList());
+ fileSlices = fileSlices.stream()
+ .filter(slice -> compareTimestamps(slice.getBaseInstantTime(),
LESSER_THAN_OR_EQUALS, maxInstantTime))
+ .collect(Collectors.toList());
if (fileSlices.size() == 0) {
return Option.empty();
}
@@ -1506,7 +1506,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return false;
}
- return HoodieTimeline.compareTimestamps(instant, GREATER_THAN,
hoodieInstantOption.get().getTimestamp());
+ return compareTimestamps(instant, GREATER_THAN,
hoodieInstantOption.get().getTimestamp());
}
private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId,
String instant) {
@@ -1515,7 +1515,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return false;
}
- return HoodieTimeline.compareTimestamps(instant, GREATER_THAN_OR_EQUALS,
hoodieInstantOption.get().getTimestamp());
+ return compareTimestamps(instant, GREATER_THAN_OR_EQUALS,
hoodieInstantOption.get().getTimestamp());
}
private boolean isFileGroupReplacedAfterOrOn(HoodieFileGroupId fileGroupId,
String instant) {
@@ -1524,7 +1524,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return false;
}
- return HoodieTimeline.compareTimestamps(instant, LESSER_THAN_OR_EQUALS,
hoodieInstantOption.get().getTimestamp());
+ return compareTimestamps(instant, LESSER_THAN_OR_EQUALS,
hoodieInstantOption.get().getTimestamp());
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 51797677016c..164b4c8ae3f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -55,6 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Implementation of {@link HoodieTableMetadata} based file-system-backed
table metadata.
@@ -173,7 +175,12 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
// Need to use serializable file status here, see HUDI-5936
List<HoodieSerializableFileStatus> dirToFileListing =
engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
- return
Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
+ try {
+ return
Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
+ } catch (FileNotFoundException e) {
+ // The partition may have been cleaned.
+ return Stream.empty();
+ }
}, listingParallelism);
pathsToList.clear();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
index a7cdf22f8020..9956c8850061 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
@@ -50,6 +50,7 @@ public class TestHoodieFileGroup {
fileGroup.addBaseFile(baseFile);
}
assertEquals(2, fileGroup.getAllFileSlices().count());
+ assertEquals(2, fileGroup.getAllFileSlicesBeforeOn("002").count());
assertTrue(!fileGroup.getAllFileSlices().anyMatch(s ->
s.getBaseInstantTime().equals("002")));
assertEquals(3, fileGroup.getAllFileSlicesIncludingInflight().count());
assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("001"));
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index ec0354355795..b15b4f2842cd 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -58,6 +58,15 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
.markAdvanced()
.withDocumentation("Name of the target project in BigQuery");
+ public static final ConfigProperty<String> BIGQUERY_SYNC_BILLING_PROJECT_ID
= ConfigProperty
+ .key("hoodie.gcp.bigquery.sync.billing.project.id")
+ .noDefaultValue()
+ .sinceVersion("0.15.1")
+ .markAdvanced()
+ .withDocumentation("Name of the billing project id in BigQuery. By
default it uses the "
+ + "configuration from `hoodie.gcp.bigquery.sync.project_id` if this
configuration is "
+ + "not set. This can only be used with manifest file based
approach");
+
public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME =
ConfigProperty
.key("hoodie.gcp.bigquery.sync.dataset_name")
.noDefaultValue()
@@ -156,6 +165,8 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
@Parameter(names = {"--project-id"}, description = "Name of the target
project in BigQuery", required = true)
public String projectId;
+ @Parameter(names = {"--billing-project-id"}, description = "Name of the
billing project in BigQuery. This can only be used with
--use-bq-manifest-file", required = false)
+ public String billingProjectId;
@Parameter(names = {"--dataset-name"}, description = "Name of the target
dataset in BigQuery", required = true)
public String datasetName;
@Parameter(names = {"--dataset-location"}, description = "Location of the
target dataset in BigQuery", required = true)
@@ -181,6 +192,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
public TypedProperties toProps() {
final TypedProperties props = hoodieSyncConfigParams.toProps();
props.setPropertyIfNonNull(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
+ props.setPropertyIfNonNull(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
billingProjectId);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(),
datasetName);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(),
datasetLocation);
props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(),
hoodieSyncConfigParams.tableName);
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index af56194214df..11733352670f 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -56,6 +56,8 @@ import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
public class HoodieBigQuerySyncClient extends HoodieSyncClient {
@@ -64,6 +66,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
protected final BigQuerySyncConfig config;
private final String projectId;
+ private final String billingProjectId;
private final String bigLakeConnectionId;
private final String datasetName;
private final boolean requirePartitionFilter;
@@ -73,6 +76,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+ this.billingProjectId = config.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID);
this.bigLakeConnectionId =
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter =
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
@@ -84,6 +88,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+ this.billingProjectId =
config.getStringOrDefault(BIGQUERY_SYNC_BILLING_PROJECT_ID, this.projectId);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter =
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.bigquery = bigquery;
@@ -129,7 +134,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
.build();
- JobId jobId =
JobId.newBuilder().setProject(projectId).setRandomJob().build();
+ JobId jobId =
JobId.newBuilder().setProject(billingProjectId).setRandomJob().build();
Job queryJob =
bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
queryJob = queryJob.waitFor();
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
index d31566df1315..910027b45303 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -34,6 +34,7 @@ import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATA
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
@@ -49,6 +50,7 @@ public class TestBigQuerySyncConfig {
public void testGetConfigs() {
Properties props = new Properties();
props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), "fooproject");
+ props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
"foobillingproject");
props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), "foodataset");
props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), "US");
props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), "footable");
@@ -61,6 +63,7 @@ public class TestBigQuerySyncConfig {
props.setProperty(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), "true");
BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
assertEquals("fooproject", syncConfig.getString(BIGQUERY_SYNC_PROJECT_ID));
+ assertEquals("foobillingproject",
syncConfig.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID));
assertEquals("foodataset",
syncConfig.getString(BIGQUERY_SYNC_DATASET_NAME));
assertEquals("US", syncConfig.getString(BIGQUERY_SYNC_DATASET_LOCATION));
assertEquals("footable", syncConfig.getString(BIGQUERY_SYNC_TABLE_NAME));
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
index 37b2800b563d..8519c5bdafd0 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -37,17 +37,22 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import java.nio.file.Path;
import java.util.Properties;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestHoodieBigQuerySyncClient {
private static final String PROJECT_ID = "test_project";
+ private static final String BILLING_PROJECT_ID = "test_billing_project";
private static final String MANIFEST_FILE_URI = "file:/manifest_file";
private static final String SOURCE_PREFIX = "file:/manifest_file/date=*";
private static final String TEST_TABLE = "test_table";
@@ -73,12 +78,38 @@ public class TestHoodieBigQuerySyncClient {
@BeforeEach
void setup() {
properties = new Properties();
- properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(),
PROJECT_ID);
+ properties.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+ properties.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
BILLING_PROJECT_ID);
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(),
TEST_DATASET);
properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(),
tempDir.toString());
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
"true");
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateOrUpdateTableUsingManifestWithBillingProjectId(boolean
setBillingProjectId) {
+ Properties props = new Properties();
+ props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+ if (setBillingProjectId) {
+ props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
BILLING_PROJECT_ID);
+ }
+ props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(),
TEST_DATASET);
+ props.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(),
tempDir.toString());
+
props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
"true");
+ BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
+ Job mockJob = mock(Job.class);
+ ArgumentCaptor<JobInfo> jobInfoCaptor =
ArgumentCaptor.forClass(JobInfo.class);
+ when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);
+
+ HoodieBigQuerySyncClient syncClient = new
HoodieBigQuerySyncClient(syncConfig, mockBigQuery);
+ Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
+ syncClient.createOrUpdateTableUsingBqManifestFile(TEST_TABLE,
MANIFEST_FILE_URI, SOURCE_PREFIX, schema);
+
+ assertEquals(
+ setBillingProjectId ? BILLING_PROJECT_ID : PROJECT_ID,
+ jobInfoCaptor.getValue().getJobId().getProject());
+ }
+
@Test
void createTableWithManifestFile_partitioned() throws Exception {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
"my-project.us.bl_connection");
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index 174ba397c780..abf316daf79c 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -375,7 +375,9 @@
<properties>
<dockerCompose.envFile>${project.basedir}/compose_env</dockerCompose.envFile>
-
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml</dockerCompose.file>
+ <dockerCompose.file>
+
${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml
+ </dockerCompose.file>
<docker.compose.skip>${skipITs}</docker.compose.skip>
<main.basedir>${project.parent.basedir}</main.basedir>
</properties>
@@ -499,7 +501,9 @@
<profile>
<id>m1-mac</id>
<properties>
-
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml</dockerCompose.file>
+ <dockerCompose.file>
+
${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml
+ </dockerCompose.file>
</properties>
<activation>
<os>
@@ -509,4 +513,4 @@
</activation>
</profile>
</profiles>
-</project>
+</project>
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index d8110a31f09c..332a1c311435 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -667,7 +667,7 @@ object DataSourceWriteOptions {
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
- .withDocumentation("Controls whether spark sql prepped update, delete, and
merge are enabled.")
+ .withDocumentation("Controls whether spark sql prepped update and delete
are enabled.")
val OVERWRITE_MODE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.overwrite.mode")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 41e8ba902a7e..a311368fe451 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,6 +21,8 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
@@ -1006,7 +1008,19 @@ class HoodieSparkSqlWriterInternal {
properties.put(HiveSyncConfigHolder.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key,
spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key,
SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key,
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
-
+ if ((fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname) == null ||
fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname).isEmpty) &&
+ (properties.get(HiveSyncConfigHolder.HIVE_PASS.key()) == null ||
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.isEmpty ||
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.equalsIgnoreCase(HiveSyncConfigHolder.HIVE_PASS.defaultValue()))){
+ try {
+ val passwd =
ShimLoader.getHadoopShims.getPassword(spark.sparkContext.hadoopConfiguration,
HiveConf.ConfVars.METASTOREPWD.varname)
+ if (passwd != null && !passwd.isEmpty) {
+ fs.getConf.set(HiveConf.ConfVars.METASTOREPWD.varname, passwd)
+ properties.put(HiveSyncConfigHolder.HIVE_PASS.key(), passwd)
+ }
+ } catch {
+ case e: Exception =>
+ log.info("Exception while trying to get Meta Sync password from
hadoop credential store", e)
+ }
+ }
// Collect exceptions in list because we want all sync to run. Then we
can throw
val failedMetaSyncs = new mutable.HashMap[String,HoodieException]()
syncClientToolClassSet.foreach(impl => {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index dd8e62ab53c9..de7a9baa9054 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -275,11 +275,10 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
sparkSession.conf.set("spark.sql.crossJoin.enabled","true")
}
- val projectedJoinedDF: DataFrame = projectedJoinedDataset
- // Create the write parameters
val props = buildMergeIntoConfig(hoodieCatalogTable)
+ val processedInputDf: DataFrame = getProcessedInputDf
// Do the upsert
- executeUpsert(projectedJoinedDF, props)
+ executeUpsert(processedInputDf, props)
// Refresh the table in the catalog
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.qualifiedName)
@@ -290,6 +289,10 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
private val insertingActions: Seq[InsertAction] =
mergeInto.notMatchedActions.collect { case u: InsertAction => u}
private val deletingActions: Seq[DeleteAction] =
mergeInto.matchedActions.collect { case u: DeleteAction => u}
+ private def hasPrimaryKey(): Boolean = {
+ hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
+ }
+
/**
* Here we're adjusting incoming (source) dataset in case its schema is
divergent from
* the target table, to make sure it (at a bare minimum)
@@ -328,29 +331,30 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def projectedJoinedDataset: DataFrame = {
+ private def getProcessedInputDf: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- // We want to join the source and target tables.
- // Then we want to project the output so that we have the meta columns
from the target table
- // followed by the data columns of the source table
- val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
- val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
- val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
- // for pkless table, we need to project the meta columns
- val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
- val projectedJoinPlan = if (!hasPrimaryKey ||
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false") == "true") {
+ // For pkless table, we need to project the meta columns by joining with
the target table;
+ // for a Hudi table with record key, we use the source table and rely on
Hudi's tagging
+ // to identify inserts, updates, and deletes to avoid the join
+ val inputPlan = if (!hasPrimaryKey()) {
+ // We want to join the source and target tables.
+ // Then we want to project the output so that we have the meta columns
from the target table
+ // followed by the data columns of the source table
+ val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
+ val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+ val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
Project(tableMetaCols ++ incomingDataCols, joinData)
} else {
- Project(incomingDataCols, joinData)
+ mergeInto.sourceTable
}
- val projectedJoinOutput = projectedJoinPlan.output
+ val inputPlanAttributes = inputPlan.output
val requiredAttributesMap = recordKeyAttributeToConditionExpression ++
preCombineAttributeAssociatedExpression
val (existingAttributesMap, missingAttributesMap) =
requiredAttributesMap.partition {
- case (keyAttr, _) => projectedJoinOutput.exists(attr =>
resolver(keyAttr.name, attr.name))
+ case (keyAttr, _) => inputPlanAttributes.exists(attr =>
resolver(keyAttr.name, attr.name))
}
// This is to handle the situation where condition is something like
"s0.s_id = t0.id" so In the source table
@@ -362,7 +366,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// them according to aforementioned heuristic) to meet Hudi's
requirements
val additionalColumns: Seq[NamedExpression] =
missingAttributesMap.flatMap {
- case (keyAttr, sourceExpression) if !projectedJoinOutput.exists(attr
=> resolver(attr.name, keyAttr.name)) =>
+ case (keyAttr, sourceExpression) if !inputPlanAttributes.exists(attr
=> resolver(attr.name, keyAttr.name)) =>
Seq(Alias(sourceExpression, keyAttr.name)())
case _ => Seq()
@@ -372,7 +376,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// matches to that one of the target table. This is necessary b/c unlike
Spark, Avro is case-sensitive
// and therefore would fail downstream if case of corresponding columns
don't match
val existingAttributes = existingAttributesMap.map(_._1)
- val adjustedSourceTableOutput = projectedJoinOutput.map { attr =>
+ val adjustedSourceTableOutput = inputPlanAttributes.map { attr =>
existingAttributes.find(keyAttr => resolver(keyAttr.name, attr.name))
match {
// To align the casing we just rename the attribute to match that one
of the
// target table
@@ -381,7 +385,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}
}
- val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
projectedJoinPlan)
+ val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
inputPlan)
Dataset.ofRows(sparkSession, amendedPlan)
}
@@ -575,7 +579,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// NOTE: We're relying on [[sourceDataset]] here instead of
[[mergeInto.sourceTable]],
// as it could be amended to add missing primary-key and/or
pre-combine columns.
// Please check [[sourceDataset]] scala-doc for more details
- (projectedJoinedDataset.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
+ (getProcessedInputDf.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
}
private def resolvesToSourceAttribute(expr: Expression): Boolean = {
@@ -618,9 +622,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
// for pkless tables, we need to enable optimized merge
- val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent
- val enableOptimizedMerge = if (!hasPrimaryKey) "true" else
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false")
- val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
+ val isPrimaryKeylessTable = !hasPrimaryKey()
+ val keyGeneratorClassName = if (isPrimaryKeylessTable) {
classOf[MergeIntoKeyGenerator].getCanonicalName
} else {
classOf[SqlKeyGenerator].getCanonicalName
@@ -658,7 +661,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
CANONICALIZE_SCHEMA.key -> "false",
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true",
- HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY ->
enableOptimizedMerge,
+ HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY ->
isPrimaryKeylessTable.toString,
HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() ->
(!StringUtils.isNullOrEmpty(preCombineField)).toString
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index bc2a169779c5..2e06da2aee51 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -17,25 +17,33 @@
package org.apache.spark.sql.hudi
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.HoodieSparkRecordMerger
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
+import org.apache.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.checkMessageContains
+import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source
+import org.scalatest.Assertions.assertResult
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import java.io.File
@@ -228,6 +236,13 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
HoodieRecordType.AVRO
}
}
+
+ protected def validateTableSchema(tableName: String,
+ expectedStructFields: List[StructField]):
Unit = {
+ assertResult(expectedStructFields)(
+ spark.sql(s"select * from $tableName").schema.fields
+ .filter(e =>
!HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(e.name)))
+ }
}
object HoodieSparkSqlTestBase {
@@ -255,4 +270,34 @@ object HoodieSparkSqlTestBase {
private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)
+ def getMetaClientAndFileSystemView(basePath: String):
+ (HoodieTableMetaClient, SyncableFileSystemView) = {
+ val storageConf = new Configuration()
+ val metaClient: HoodieTableMetaClient =
+
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+ val metadataConfig = HoodieMetadataConfig.newBuilder.build
+ val engineContext = new HoodieLocalEngineContext(storageConf)
+ val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManagerWithTableMetadata(
+ engineContext,
+ metadataConfig,
+ FileSystemViewStorageConfig.newBuilder.build,
+ HoodieCommonConfig.newBuilder.build
+ )
+ val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
+ (metaClient, fsView)
+ }
+
+ /**
+ * Replaces the existing file with an empty file which is meant to be
corrupted
+ * in a Hudi table.
+ *
+ * @param storage [[HoodieStorage]] instance
+ * @param filePath file path
+ */
+ def replaceWithEmptyFile(filePath: Path): Unit = {
+ val conf = new Configuration
+ val fs = FileSystem.get(conf)
+ fs.delete(filePath, true)
+ fs.create(filePath, true)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDescribeTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDescribeTable.scala
new file mode 100644
index 000000000000..76c8f7b80691
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDescribeTable.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.ddl
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+import java.util.function.Predicate
+
+class TestDescribeTable extends HoodieSparkSqlTestBase {
+
+ test("Test desc hudi table command") {
+ withTempDir { tmp =>
+ val tbName = "wk_date"
+ val basePath = s"${tmp.getCanonicalPath}/$tbName"
+
+ spark.sql(
+ s"""
+ |create table $tbName (id int, driver string, precomb int, dat
string)
+ | using hudi
+ | partitioned by(dat)
+ | tblproperties(
+ | type='cow',
+ | primaryKey='id',
+ | preCombineField='precomb'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ // just for scala-2.11 compatibility
+ val locationInFirstColumn: Predicate[Row] = new Predicate[Row] {
+ def test(row: Row): Boolean = row(0).equals("Location")
+ }
+
+ spark.sql("set hoodie.schema.on.read.enable=false")
+ var output: java.util.List[Row] = spark.sql(s"describe extended
$tbName").collectAsList()
+ assert(output.stream().anyMatch(locationInFirstColumn))
+
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ output = spark.sql(s"desc formatted $tbName").collectAsList()
+ assert(output.stream().anyMatch(locationInFirstColumn))
+
+ output = spark.sql(s"describe table extended $tbName").collectAsList()
+ assert(output.stream().anyMatch(locationInFirstColumn))
+
+ // DESC returns only columns and partitions when run without 'extended'
or 'formatted' keywords
+ output = spark.sql(s"describe table $tbName").collectAsList()
+ assert(output.stream().noneMatch(locationInFirstColumn))
+
+ output = spark.sql(s"desc table $tbName").collectAsList()
+ assert(output.stream().noneMatch(locationInFirstColumn))
+
+ output = spark.sql(s"desc $tbName").collectAsList()
+ assert(output.stream().noneMatch(locationInFirstColumn))
+ }
+ }
+
+ test("Test desc non-hudi table command") {
+ withTempDir { tmp =>
+ val tbName = "wk_date"
+ val basePath = s"${tmp.getCanonicalPath}/$tbName"
+
+ spark.sql(
+ s"""
+ |create table $tbName (
+ | id int,
+ | driver string,
+ | precomb int,
+ | dat string
+ |)
+ | using parquet
+ | location '$basePath'
+ """.stripMargin)
+
+ // just for scala-2.11 compatibility
+ val locationInFirstColumn: Predicate[Row] = new Predicate[Row] {
+ def test(row: Row): Boolean = row(0).equals("Location")
+ }
+
+ spark.sql("set hoodie.schema.on.read.enable=false")
+ var output: java.util.List[Row] = spark.sql(s"describe extended
$tbName").collectAsList()
+ assert(output.stream().anyMatch(locationInFirstColumn))
+
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ output = spark.sql(s"desc formatted $tbName").collectAsList()
+ assert(output.stream().anyMatch(locationInFirstColumn))
+
+ output = spark.sql(s"describe table extended $tbName").collectAsList()
+ assert(output.stream().anyMatch(locationInFirstColumn))
+
+ // DESC returns only columns and partitions when run without 'extended'
or 'formatted' keywords
+ output = spark.sql(s"describe table $tbName").collectAsList()
+ assert(output.stream().noneMatch(locationInFirstColumn))
+
+ output = spark.sql(s"desc table $tbName").collectAsList()
+ assert(output.stream().noneMatch(locationInFirstColumn))
+
+ output = spark.sql(s"desc $tbName").collectAsList()
+ assert(output.stream().noneMatch(locationInFirstColumn))
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 80ee86ee6f21..76a1d540c924 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType,
StructField}
+import org.junit.jupiter.api.Assertions.assertTrue
class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
@@ -28,6 +31,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSuppo
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = false")
+ spark.sql("set hoodie.metadata.index.column.stats.enable = false")
+ spark.sql("set hoodie.metadata.index.partition.stats.enable = false")
val tableName = generateTableName
// Create table
spark.sql(
@@ -36,8 +41,10 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long
+ | ts int,
+ | partition string
|) using hudi
+ | partitioned by (partition)
| location '${tmp.getCanonicalPath}'
| tblproperties (
| primaryKey ='id',
@@ -45,76 +52,113 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| )
""".stripMargin)
- // test with optimized sql merge enabled / disabled.
- spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
- // First merge with a extra input field 'flag' (insert a new record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
- | ) s0
- | on s0.id = $tableName.id
- | when matched and flag = '1' then update set
- | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
- | when not matched and flag = '1' then insert *
+ val structFields = List(
+ StructField("id", IntegerType, nullable = true),
+ StructField("name", StringType, nullable = true),
+ StructField("price", DoubleType, nullable = true),
+ StructField("ts", IntegerType, nullable = true),
+ StructField("partition", StringType, nullable = true))
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 'p1'
as partition, '1' as flag
+ | union
+ | select 2 as id, 'a2' as name, 20 as price, 1000 as ts, 'p2'
as partition, '1' as flag
+ | union
+ | select 3 as id, 'a3' as name, 30 as price, 1000 as ts, 'p3'
as partition, '1' as flag
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and flag = '1' then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when not matched and flag = '1' then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
+ validateTableSchema(tableName, structFields)
+ checkAnswer(s"select id, name, price, ts, partition from
$tableName")(
+ Seq(1, "a1", 10.0, 1000, "p1"),
+ Seq(2, "a2", 20.0, 1000, "p2"),
+ Seq(3, "a3", 30.0, 1000, "p3")
+ )
- // Second merge (update the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 10 as price, 1001 as ts
- | ) s0
- | on s0.id = $tableName.id
- | when matched then update set
- | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
- | when not matched then insert *
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 20.0, 1001)
- )
+ // Second merge (update the record) with different field names in
the source
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as _id, 'a1' as name, 10 as _price, 1001 as _ts,
'p1' as partition
+ | ) s0
+ | on s0._id = $tableName.id
+ | when matched then update set
+ | id = s0._id, name = s0.name, price = s0._price +
$tableName.price, ts = s0._ts
+ | """.stripMargin)
+ validateTableSchema(tableName, structFields)
+ checkAnswer(s"select id, name, price, ts, partition from
$tableName")(
+ Seq(1, "a1", 20.0, 1001, "p1"),
+ Seq(2, "a2", 20.0, 1000, "p2"),
+ Seq(3, "a3", 30.0, 1000, "p3")
+ )
- // the third time merge (update & insert the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select * from (
- | select 1 as id, 'a1' as name, 10 as price, 1002 as ts
- | union all
- | select 2 as id, 'a2' as name, 12 as price, 1001 as ts
- | )
- | ) s0
- | on s0.id = $tableName.id
- | when matched then update set
- | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
- | when not matched and s0.id % 2 = 0 then insert *
+ // the third time merge (update & insert the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select * from (
+ | select 1 as id, 'a1' as name, 10 as price, 1002 as ts, 'p1'
as partition
+ | union all
+ | select 4 as id, 'a4' as name, 40 as price, 1001 as ts, 'p4'
as partition
+ | )
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set
+ | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
+ | when not matched and s0.id % 2 = 0 then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 30.0, 1002),
- Seq(2, "a2", 12.0, 1001)
+ validateTableSchema(tableName, structFields)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName")(
+ Seq(1, "a1", 30.0, 1002, "p1"),
+ Seq(2, "a2", 20.0, 1000, "p2"),
+ Seq(3, "a3", 30.0, 1000, "p3"),
+ Seq(4, "a4", 40.0, 1001, "p4")
)
+ // Validate that MERGE INTO only scan affected partitions in the
target table
+ // Corrupt the files in other partitions not receiving updates
+ val (metaClient, fsv) =
HoodieSparkSqlTestBase.getMetaClientAndFileSystemView(tmp.getCanonicalPath)
+ Seq("p2", "p3", "p4").map(e => "partition=" + e).foreach(partition => {
+ assertTrue(fsv.getLatestFileSlices(partition).count() > 0)
+ fsv.getLatestFileSlices(partition).forEach(new
java.util.function.Consumer[FileSlice] {
+ override def accept(fileSlice: FileSlice): Unit = {
+ if (fileSlice.getBaseFile.isPresent) {
+ HoodieSparkSqlTestBase.replaceWithEmptyFile(
+ fileSlice.getBaseFile.get.getHadoopPath)
+ }
+ fileSlice.getLogFiles.forEach(new
java.util.function.Consumer[HoodieLogFile] {
+ override def accept(logFile: HoodieLogFile): Unit = {
+ HoodieSparkSqlTestBase.replaceWithEmptyFile(logFile.getPath)
+ }
+ })
+ }
+ })
+ })
// the fourth merge (delete the record)
spark.sql(
s"""
| merge into $tableName
| using (
- | select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+ | select 1 as id, 'a1' as name, 12 as price, 1003 as ts, 'p1' as
partition
| ) s0
| on s0.id = $tableName.id
| when matched and s0.id != 1 then update set
| id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
| when matched and s0.id = 1 then delete
| when not matched then insert *
- """.stripMargin)
- val cnt = spark.sql(s"select * from $tableName where id = 1").count()
+ | """.stripMargin)
+ val cnt = spark.sql(s"select * from $tableName where partition =
'p1'").count()
assertResult(0)(cnt)
})
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index d64bc94301a1..7917f93de539 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -17,11 +17,12 @@
package org.apache.spark.sql.hudi.analysis
-import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceReadOptions, DefaultSource,
SparkAdapterSupport}
+
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
import
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren
-import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
EliminateSubqueryAliases, NamedRelation, UnresolvedAttribute,
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
NamedRelation, UnresolvedAttribute, UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery,
HoodieTableChanges, HoodieTableChangesOptionsParser}
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.{Table, V1Table}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
@@ -280,6 +282,9 @@ case class HoodieSpark32PlusPostAnalysisRule(sparkSession:
SparkSession) extends
retainData = true
)
+ case DescribeRelation(MatchResolvedTable(_, id, HoodieV1OrV2Table(_)),
partitionSpec, isExtended, output) =>
+ DescribeTableCommand(id.asTableIdentifier, partitionSpec, isExtended,
output)
+
case _ => plan
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 7d9b2afa33ce..fbb9dac49677 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -319,6 +319,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
// Sync the partitions if needed
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
+ LOG.info("Partitions dropped since last sync: {}",
droppedPartitions.size());
partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
}
return partitionsChanged;
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 5bd4838a62f5..732a15845c89 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -1281,6 +1281,129 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the
TBLPROPERTIES");
}
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ void testGetPartitionEvents_droppedStoragePartitionNotPresentInMetastore(
+ String syncMode, String enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ // Create a table with 1 partition
+ String instantTime1 = "100";
+ HiveTestUtil.createCOWTable(instantTime1, 1, true);
+
+ reInitHiveSyncClient();
+ // Sync the table to metastore
+ reSyncHiveTable();
+
+ List<Partition> partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in
metastore");
+
+ // Add a partition to storage but don't sync it to metastore
+ String instantTime2 = "101";
+ String newPartition = "2010/02/01";
+ HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
+
+ // Verify the partition is not in metastore yet
+ partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in
metastore");
+
+ // Delete the partition that was never synced to metastore
+ String instantTime3 = "102";
+ HiveTestUtil.createReplaceCommit(instantTime3, newPartition,
WriteOperationType.DELETE_PARTITION, true, true);
+
+ // Add another partition to storage but don't sync to metastore
+ String instantTime4 = "103";
+ String addPartition = "2010/04/01";
+ HiveTestUtil.addCOWPartition(addPartition, true, true, instantTime4);
+
+ reInitHiveSyncClient();
+
+ Set<String> droppedPartitionsOnStorage =
hiveClient.getDroppedPartitionsSince(Option.of(instantTime1),
Option.of(instantTime1));
+ List<String> writtenPartitionsOnStorage =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime1),
Option.of(instantTime1));
+ List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(
+ partitionsInMetastore, writtenPartitionsOnStorage,
droppedPartitionsOnStorage);
+
+ // Verify no DROP event is generated for partition that was never in
metastore
+ long dropEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.DROP)
+ .count();
+ assertEquals(0, dropEvents,
+ "No DROP partition event should be generated for partition that was
never in metastore");
+
+ // Verify ADD event is generated for the new partition that was added to
storage
+ List<PartitionEvent> addEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.ADD)
+ .collect(Collectors.toList());
+ assertEquals(1, addEvents.size(),
+ "ADD partition event should be generated for new partition added to
storage");
+ assertEquals(addPartition, addEvents.get(0).storagePartition);
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ void testGetPartitionEvents_droppedStoragePartitionPresentInMetastore(
+ String syncMode, String enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ // Create a table with 1 partition
+ String instantTime1 = "100";
+ HiveTestUtil.createCOWTable(instantTime1, 1, true);
+
+ reInitHiveSyncClient();
+ // Sync the table to metastore
+ reSyncHiveTable();
+
+ List<Partition> partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in
metastore");
+
+ // Add a partition and sync it to metastore
+ String instantTime2 = "101";
+ String newPartition = "2010/02/01";
+ HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
+
+ reInitHiveSyncClient();
+ // Sync the table to metastore
+ reSyncHiveTable();
+
+ partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(2, partitionsInMetastore.size(), "Should have 2 partitions in
metastore");
+
+ // Now delete the partition that exists in metastore
+ String instantTime3 = "102";
+ HiveTestUtil.createReplaceCommit(instantTime3, newPartition,
WriteOperationType.DELETE_PARTITION, true, true);
+
+ // Add another partition to storage but don't sync to metastore
+ String instantTime4 = "103";
+ String addPartition = "2010/04/01";
+ HiveTestUtil.addCOWPartition(addPartition, true, true, instantTime4);
+
+ reInitHiveSyncClient();
+
+ // Get partition events
+ Set<String> droppedPartitionsOnStorage =
hiveClient.getDroppedPartitionsSince(Option.of(instantTime2),
Option.of(instantTime2));
+ List<String> writtenPartitionsOnStorage =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime2),
Option.of(instantTime2));
+ List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(
+ partitionsInMetastore, writtenPartitionsOnStorage,
droppedPartitionsOnStorage);
+
+ // Verify DROP event is generated for partition that exists in metastore
+ List<PartitionEvent> dropEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.DROP)
+ .collect(Collectors.toList());
+ assertEquals(1, dropEvents.size(),
+ "DROP partition event should be generated for partition that exists in
metastore");
+ assertEquals(newPartition, dropEvents.get(0).storagePartition);
+
+ // Verify ADD event is generated for the new partition that was added to
storage
+ List<PartitionEvent> addEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.ADD)
+ .collect(Collectors.toList());
+ assertEquals(1, addEvents.size(),
+ "ADD partition event should be generated for new partition added to
storage");
+ assertEquals(addPartition, addEvents.get(0).storagePartition);
+ }
+
@ParameterizedTest
@MethodSource("syncModeAndEnablePushDown")
public void testNonPartitionedSync(String syncMode, String enablePushDown)
throws Exception {
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 6ec35c435a05..6f0faca58fa1 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -213,11 +213,14 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- if (droppedPartitionsOnStorage.contains(storagePartition)) {
- events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
- } else {
- if (!storagePartitionValues.isEmpty()) {
- String storageValue = String.join(", ", storagePartitionValues);
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ if (droppedPartitionsOnStorage.contains(storagePartition)) {
+ if (paths.containsKey(storageValue)) {
+ // Add partition drop event only if it exists in the metastore
+ events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+ }
+ } else {
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
diff --git a/packaging/bundle-validation/ci_run.sh
b/packaging/bundle-validation/ci_run.sh
index 505ee9c7c2d4..599c9b0b019d 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -180,6 +180,12 @@ fi
ls -l $TMP_JARS_DIR
+# Fail early if Spark bundle is missing (validate.sh requires it)
+if [ -z "$(ls $TMP_JARS_DIR/hudi-spark*.jar 2>/dev/null)" ]; then
+ echo "Error: Hudi Spark bundle jar not found in $TMP_JARS_DIR. Ensure the
build produces packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar and
ci_run.sh copies it."
+ exit 1
+fi
+
# Copy test dataset
TMP_DATA_DIR=/tmp/data/$(date +%s)
mkdir -p $TMP_DATA_DIR/stocks/data
diff --git a/packaging/bundle-validation/validate.sh
b/packaging/bundle-validation/validate.sh
index 75d4227c74a3..eefeee2a1ce1 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -38,6 +38,13 @@ ln -sf $JARS_DIR/hudi-utilities-slim*.jar
$JARS_DIR/utilities-slim.jar
ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar
ln -sf $JARS_DIR/hudi-metaserver-server-bundle*.jar $JARS_DIR/metaserver.jar
+# Resolve spark bundle jar to actual file (symlink may be broken if no
hudi-spark*.jar was mounted)
+SPARK_JAR=$(ls $JARS_DIR/hudi-spark*.jar 2>/dev/null | head -1)
+if [ -z "$SPARK_JAR" ] || [ ! -f "$SPARK_JAR" ]; then
+ echo "::error::validate.sh Hudi Spark bundle jar not found in $JARS_DIR (no
hudi-spark*.jar)"
+ exit 1
+fi
+
##
# Function to change Java runtime version by changing JAVA_HOME
##
@@ -76,15 +83,17 @@ test_spark_hadoop_mr_bundles () {
local HIVE_PID=$!
change_java_runtime_version
echo "::warning::validate.sh Writing sample data via Spark DataSource and
run Hive Sync..."
- $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar <
$WORKDIR/spark_hadoop_mr/write.scala
+ # Use resolved SPARK_JAR and driver classpath so HoodieCatalog
(spark-defaults.conf for Spark 3.2/3.3) is visible
+ $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" --conf
"spark.driver.extraClassPath=$SPARK_JAR" < $WORKDIR/spark_hadoop_mr/write.scala
echo "::warning::validate.sh Query and validate the results using Spark
SQL"
# save Spark SQL query results
- $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar \
+ $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" --conf
"spark.driver.extraClassPath=$SPARK_JAR" \
-i <(echo 'spark.sql("select * from
trips").coalesce(1).write.csv("/tmp/spark-bundle/sparksql/trips/results");
System.exit(0)')
- numRecords=$(cat /tmp/spark-bundle/sparksql/trips/results/*.csv | wc -l)
+ numRecords=$(cat /tmp/spark-bundle/sparksql/trips/results/*.csv
2>/dev/null | wc -l)
+ numRecords=${numRecords:-0}
if [ "$numRecords" -ne 10 ]; then
- echo "::error::validate.sh Spark SQL validation failed."
+ echo "::error::validate.sh Spark SQL validation failed (expected 10
records, got $numRecords)."
exit 1
fi
echo "::warning::validate.sh Query and validate the results using HiveQL"
@@ -246,13 +255,14 @@ test_metaserver_bundle () {
change_java_runtime_version
echo "::warning::validate.sh Writing sample data via Spark DataSource."
- $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar <
$WORKDIR/service/write.scala
+ $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" <
$WORKDIR/service/write.scala
ls /tmp/hudi-bundles/tests/trips
echo "::warning::validate.sh Query and validate the results using Spark
DataSource"
# save Spark DataSource query results
- $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar <
$WORKDIR/service/read.scala
- numRecords=$(cat
/tmp/metaserver-bundle/sparkdatasource/trips/results/*.csv | wc -l)
+ $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" <
$WORKDIR/service/read.scala
+ numRecords=$(cat
/tmp/metaserver-bundle/sparkdatasource/trips/results/*.csv 2>/dev/null | wc -l)
+ numRecords=${numRecords:-0}
echo $numRecords
use_default_java_runtime
if [ "$numRecords" -ne 10 ]; then
@@ -289,7 +299,7 @@ else
fi
echo "::warning::validate.sh validating utilities slim bundle"
-test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar
+test_utilities_bundle $JARS_DIR/utilities-slim.jar $SPARK_JAR
if [ "$?" -ne 0 ]; then
exit 1
fi