This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4e658041a95 [HUDI-8552] Add a new merge handle based on file group
reader for compaction in Spark (#12390)
4e658041a95 is described below
commit 4e658041a950b3b5bddccdbea440bae5ffa211cb
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Dec 4 18:19:01 2024 -0800
[HUDI-8552] Add a new merge handle based on file group reader for
compaction in Spark (#12390)
---
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 19 +-
.../apache/hudi/table/EngineBroadcastManager.java | 58 +++++
.../apache/hudi/table/HoodieCompactionHandler.java | 11 +
.../hudi/table/action/compact/HoodieCompactor.java | 59 ++++-
.../client/common/HoodieSparkEngineContext.java | 4 +
...HoodieSparkFileGroupReaderBasedMergeHandle.java | 268 +++++++++++++++++++++
.../io/storage/HoodieSparkFileWriterFactory.java | 13 +
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 16 ++
.../apache/hudi/table/SparkBroadcastManager.java | 113 +++++++++
.../HoodieSparkMergeOnReadTableCompactor.java | 8 +
.../SparkFileFormatInternalRowReaderContext.scala | 6 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 27 +++
.../hudi/common/engine/HoodieEngineContext.java | 4 +
.../apache/hudi/common/model/HoodieWriteStat.java | 24 +-
.../read/HoodieBaseFileGroupRecordBuffer.java | 84 +++++--
.../common/table/read/HoodieFileGroupReader.java | 7 +
.../table/read/HoodieFileGroupRecordBuffer.java | 5 +
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 7 +-
.../hudi/common/table/read/HoodieReadStats.java | 72 ++++++
.../testutils/reader/HoodieTestReaderContext.java | 3 +
.../hudi/hadoop/utils/ObjectInspectorCache.java | 7 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 7 +-
hudi-spark-datasource/hudi-spark/pom.xml | 7 +
.../hudi/client/TestHoodieClientMultiWriter.java | 0
.../TestMultiWriterWithPreferWriterIngestion.java | 0
.../hudi/client/TestTableSchemaEvolution.java | 0
.../functional/TestConsistentBucketIndex.java | 0
...DataValidationCheckForLogCompactionActions.java | 0
.../functional/TestHoodieBackedMetadata.java | 0
.../TestHoodieClientOnMergeOnReadStorage.java | 0
.../TestMetadataUtilRLIandSIRecordGeneration.java | 0
.../TestRemoteFileSystemViewWithMetadataTable.java | 0
.../TestSavepointRestoreMergeOnRead.java | 0
.../hudi/table/TestHoodieMergeOnReadTable.java | 0
.../table/action/compact/TestAsyncCompaction.java | 30 ++-
.../table/action/compact/TestHoodieCompactor.java | 13 +-
.../table/action/compact/TestInlineCompaction.java | 0
.../TestMergeOnReadRollbackActionExecutor.java | 0
.../TestHoodieSparkMergeOnReadTableCompaction.java | 0
...HoodieSparkMergeOnReadTableIncrementalRead.java | 0
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 0
.../TestHoodieSparkMergeOnReadTableRollback.java | 4 +-
.../TestSparkNonBlockingConcurrencyControl.java | 0
.../read/TestHoodieFileGroupReaderOnSpark.scala | 2 +-
.../TestSpark35RecordPositionMetadataColumn.scala | 2 +-
.../functional/TestColumnStatsIndexWithSQL.scala | 5 +-
.../hudi/functional/TestMORDataSourceStorage.scala | 8 +-
.../hudi/dml/TestPartialUpdateForMergeInto.scala | 38 ++-
.../TestHoodieDeltaStreamerWithMultiWriter.java | 1 +
49 files changed, 837 insertions(+), 95 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 707c86dd73a..31f221fb85f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -113,7 +113,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
protected long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
protected Option<BaseKeyGenerator> keyGeneratorOpt;
- private HoodieBaseFile baseFileToMerge;
+ protected HoodieBaseFile baseFileToMerge;
protected Option<String[]> partitionFields = Option.empty();
protected Object[] partitionValues = new Object[0];
@@ -147,6 +147,21 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}
+ /**
+ * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
+ *
+ * @param config Hudi write config
+ * @param instantTime Instant time to use
+ * @param partitionPath Partition path
+ * @param fileId File group ID for the merge handle to operate
on
+ * @param hoodieTable {@link HoodieTable} instance
+ * @param taskContextSupplier Task context supplier
+ */
+ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
String partitionPath,
+ String fileId, HoodieTable<T, I, K, O> hoodieTable,
TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
+ }
+
private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator>
keyGeneratorOpt, boolean populateMetaFields) {
ValidationUtils.checkArgument(populateMetaFields ==
!keyGeneratorOpt.isPresent());
this.keyGeneratorOpt = keyGeneratorOpt;
@@ -467,7 +482,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
}
public void performMergeDataValidationCheck(WriteStatus writeStatus) {
- if (!config.isMergeDataValidationCheckEnabled()) {
+ if (!config.isMergeDataValidationCheckEnabled() || baseFileToMerge ==
null) {
return;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
new file mode 100644
index 00000000000..bcee2829dd8
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Serializable;
+
+/**
+ * Broadcast variable management for engines.
+ */
+public class EngineBroadcastManager implements Serializable {
+
+ /**
+ * Prepares and broadcasts necessary information needed by compactor.
+ */
+ public void prepareAndBroadcast() {
+ // NO operation.
+ }
+
+ /**
+ * Returns the {@link HoodieReaderContext} instance needed by the file group
reader based on
+ * the broadcast variables.
+ *
+ * @param basePath Table base path
+ */
+ public Option<HoodieReaderContext>
retrieveFileGroupReaderContext(StoragePath basePath) {
+ return Option.empty();
+ }
+
+ /**
+ * Retrieves the broadcast configuration.
+ */
+ public Option<Configuration> retrieveStorageConfig() {
+ return Option.empty();
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
index 4d6a216cf23..5af5213cc09 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
@@ -20,11 +20,15 @@
package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hadoop.conf.Configuration;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -42,6 +46,13 @@ public interface HoodieCompactionHandler<T> {
Iterator<List<WriteStatus>> handleInsert(String instantTime, String
partitionPath, String fileId,
Map<String, HoodieRecord<?>>
recordMap);
+ default List<WriteStatus> compactUsingFileGroupReader(String instantTime,
+ CompactionOperation
operation,
+ HoodieReaderContext
readerContext,
+ Configuration conf) {
+ throw new HoodieNotSupportedException("This engine does not support file
group reader based compaction.");
+ }
+
default Iterator<List<WriteStatus>> handleInsertsForLogCompaction(String
instantTime, String partitionPath, String fileId,
Map<String,
HoodieRecord<?>> recordMap,
Map<HoodieLogBlock.HeaderMetadataType, String> header) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index e92c2a5d7c2..96f9316902d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -45,6 +46,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.EngineBroadcastManager;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -87,6 +89,15 @@ public abstract class HoodieCompactor<T, I, K, O> implements
Serializable {
*/
public abstract void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime);
+ /**
+ * @param context {@link HoodieEngineContext} instance
+ *
+ * @return the {@link EngineBroadcastManager} if available.
+ */
+ public Option<EngineBroadcastManager>
getEngineBroadcastManager(HoodieEngineContext context) {
+ return Option.empty();
+ }
+
/**
* Execute compaction operations and report back status.
*/
@@ -130,13 +141,32 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// if this is a MDT, set up the instant range of log reader just like
regular MDT snapshot reader.
Option<InstantRange> instantRange =
CompactHelpers.getInstance().getInstantRange(metaClient);
- return context.parallelize(operations).map(operation -> compact(
- compactionHandler, metaClient, config, operation,
compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier,
executionHelper))
- .flatMap(List::iterator);
+
+ boolean useFileGroupReaderBasedCompaction =
context.supportsFileGroupReader() // the engine needs to support fg reader
first
+ && !metaClient.isMetadataTable()
+ &&
config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+ && !hasBootstrapFile(operations)
// bootstrap file read for fg reader is not ready
+ && StringUtils.isNullOrEmpty(config.getInternalSchema())
// schema evolution support for fg reader is not ready
+ && !containsUnsupportedTypesForFileGroupReader(config.getSchema())
// Enum type support by fg reader is not ready
+ && config.populateMetaFields();
// Virtual key support by fg reader is not ready
+
+ if (useFileGroupReaderBasedCompaction) {
+ Option<EngineBroadcastManager> broadcastManagerOpt =
getEngineBroadcastManager(context);
+ // Broadcast required information.
+
broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast);
+ return context.parallelize(operations).map(
+ operation -> compact(compactionHandler, metaClient, operation,
compactionInstantTime, broadcastManagerOpt))
+ .flatMap(List::iterator);
+ } else {
+ return context.parallelize(operations).map(
+ operation -> compact(compactionHandler, metaClient, config,
operation, compactionInstantTime, maxInstantTime,
+ instantRange, taskContextSupplier, executionHelper))
+ .flatMap(List::iterator);
+ }
}
/**
- * Execute a single compaction operation and report back status.
+ * Execute a single compaction operation using file group reader and report
back status.
*/
public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
HoodieTableMetaClient metaClient,
@@ -237,6 +267,7 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
Iterator<List<WriteStatus>> result;
result = executionHelper.writeFileAndGetWriteStats(compactionHandler,
operation, instantTime, scanner, oldDataFileOpt);
scanner.close();
+
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(),
false).flatMap(Collection::stream).peek(s -> {
final HoodieWriteStat stat = s.getStat();
@@ -261,6 +292,19 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
}).collect(toList());
}
+ /**
+ * Execute a single compaction operation and report back status.
+ */
+ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
+ HoodieTableMetaClient metaClient,
+ CompactionOperation operation,
+ String instantTime,
+ Option<EngineBroadcastManager>
broadcastManagerOpt) throws IOException {
+ return compactionHandler.compactUsingFileGroupReader(instantTime,
+ operation,
broadcastManagerOpt.get().retrieveFileGroupReaderContext(metaClient.getBasePath()).get(),
+ broadcastManagerOpt.get().retrieveStorageConfig().get());
+ }
+
public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
@@ -278,4 +322,11 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
}
}
+ private boolean hasBootstrapFile(List<CompactionOperation> operationList) {
+ return operationList.stream().anyMatch(operation ->
operation.getBootstrapFilePath().isPresent());
+ }
+
+ private boolean containsUnsupportedTypesForFileGroupReader(String schemaStr)
{
+ return HoodieAvroUtils.containsUnsupportedTypesForFileGroupReader(new
Schema.Parser().parse(schemaStr));
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index b1763634bc0..6485e8e6ad4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -108,6 +108,10 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
return HoodieJavaRDD.of(javaSparkContext.emptyRDD());
}
+ public boolean supportsFileGroupReader() {
+ return true;
+ }
+
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieJavaRDD.of(javaSparkContext.parallelize(data, parallelism));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
new file mode 100644
index 00000000000..b7d8d2f80f2
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import
org.apache.hudi.common.table.read.HoodieFileGroupReader.HoodieFileGroupReaderIterator;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS;
+
+/**
+ * A merge handle implementation based on the {@link HoodieFileGroupReader}.
+ * <p>
+ * This merge handle is used for compaction on Spark, which passes a file
slice from the
+ * compaction operation of a single file group to a file group reader, get an
iterator of
+ * the records, and writes the records to a new base file.
+ */
+@NotThreadSafe
+public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends
HoodieMergeHandle<T, I, K, O> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieSparkFileGroupReaderBasedMergeHandle.class);
+
+ protected HoodieReaderContext readerContext;
+ protected FileSlice fileSlice;
+ protected Configuration conf;
+
+ public HoodieSparkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config,
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ CompactionOperation
operation, TaskContextSupplier taskContextSupplier,
+ Option<BaseKeyGenerator>
keyGeneratorOpt,
+ HoodieReaderContext
readerContext, Configuration conf) {
+ super(config, instantTime, operation.getPartitionPath(),
operation.getFileId(), hoodieTable, taskContextSupplier);
+ this.keyToNewRecords = Collections.emptyMap();
+ this.readerContext = readerContext;
+ this.conf = conf;
+ Option<HoodieBaseFile> baseFileOpt =
+ operation.getBaseFile(config.getBasePath(),
operation.getPartitionPath());
+ List<HoodieLogFile> logFiles =
operation.getDeltaFileNames().stream().map(p ->
+ new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
+ config.getBasePath(), operation.getPartitionPath()), p)))
+ .collect(Collectors.toList());
+ this.fileSlice = new FileSlice(
+ operation.getFileGroupId(),
+ operation.getBaseInstantTime(),
+ baseFileOpt.isPresent() ? baseFileOpt.get() : null,
+ logFiles);
+ this.preserveMetadata = true;
+ init(operation, this.partitionPath, baseFileOpt);
+ validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+ }
+
+ private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator>
keyGeneratorOpt, boolean populateMetaFields) {
+ ValidationUtils.checkArgument(populateMetaFields ==
!keyGeneratorOpt.isPresent());
+ this.keyGeneratorOpt = keyGeneratorOpt;
+ }
+
+ private void init(CompactionOperation operation, String partitionPath,
Option<HoodieBaseFile> baseFileToMerge) {
+ LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" +
fileId);
+ this.baseFileToMerge = baseFileToMerge.orElse(null);
+ this.writtenRecordKeys = new HashSet<>();
+ writeStatus.setStat(new HoodieWriteStat());
+ writeStatus.getStat().setTotalLogSizeCompacted(
+
operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+ try {
+ Option<String> latestValidFilePath = Option.empty();
+ if (baseFileToMerge.isPresent()) {
+ latestValidFilePath = Option.of(baseFileToMerge.get().getFileName());
+
writeStatus.getStat().setPrevCommit(baseFileToMerge.get().getCommitTime());
+ // At the moment, we only support SI for overwrite with latest
payload. So, we don't need to embed entire file slice here.
+ // HUDI-8518 will be taken up to fix it for any payload during which
we might require entire file slice to be set here.
+ // Already AppendHandle adds all logs file from current file slice to
HoodieDeltaWriteStat.
+ writeStatus.getStat().setPrevBaseFile(latestValidFilePath.get());
+ } else {
+ writeStatus.getStat().setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+ }
+
+ HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(storage, instantTime,
+ new StoragePath(config.getBasePath()),
+ FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
+ hoodieTable.getPartitionMetafileFormat());
+ partitionMetadata.trySave();
+
+ String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken,
fileId, hoodieTable.getBaseFileExtension());
+ makeOldAndNewFilePaths(partitionPath,
+ latestValidFilePath.isPresent() ? latestValidFilePath.get() : null,
newFileName);
+
+ LOG.info(String.format(
+ "Merging data from file group %s, to a new base file %s", fileId,
newFilePath.toString()));
+ // file name is same for all records, in this bunch
+ writeStatus.setFileId(fileId);
+ writeStatus.setPartitionPath(partitionPath);
+ writeStatus.getStat().setPartitionPath(partitionPath);
+ writeStatus.getStat().setFileId(fileId);
+ setWriteStatusPath();
+
+ // Create Marker file,
+ // uses name of `newFilePath` instead of `newFileName`
+ // in case the sub-class may roll over the file handle name.
+ createMarkerFile(partitionPath, newFilePath.getName());
+
+ // Create the writer for writing the new version file
+ fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
newFilePath, hoodieTable.getStorage(),
+ config, writeSchemaWithMetaFields, taskContextSupplier,
HoodieRecord.HoodieRecordType.SPARK);
+ } catch (IOException io) {
+ LOG.error("Error in update task at commit " + instantTime, io);
+ writeStatus.setGlobalError(io);
+ throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle
for FileId: " + fileId + " on commit "
+ + instantTime + " on path " +
hoodieTable.getMetaClient().getBasePath(), io);
+ }
+ }
+
+ /**
+ * Reads the file slice of a compaction operation using a file group reader,
+ * by getting an iterator of the records; then writes the records to a new
base file
+ * using Spark parquet writer.
+ */
+ public void write() {
+ boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+ Option<InternalSchema> internalSchemaOption = Option.empty();
+ if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
+ internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
+ }
+ // Initializes file group reader
+ try (HoodieFileGroupReader<T> fileGroupReader = new
HoodieFileGroupReader<>(
+ readerContext,
+ storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new
HadoopStorageConfiguration(conf)),
+ hoodieTable.getMetaClient().getBasePath().toString(),
+ instantTime,
+ fileSlice,
+ writeSchemaWithMetaFields,
+ writeSchemaWithMetaFields,
+ internalSchemaOption,
+ hoodieTable.getMetaClient(),
+ hoodieTable.getMetaClient().getTableConfig().getProps(),
+ 0,
+ Long.MAX_VALUE,
+ usePosition)) {
+ fileGroupReader.initRecordIterators();
+ // Reads the records from the file slice
+ try (HoodieFileGroupReaderIterator<InternalRow> recordIterator
+ = (HoodieFileGroupReaderIterator<InternalRow>)
fileGroupReader.getClosableIterator()) {
+ StructType sparkSchema =
AvroConversionUtils.convertAvroSchemaToStructType(writeSchemaWithMetaFields);
+ while (recordIterator.hasNext()) {
+ // Constructs Spark record for the Spark Parquet file writer
+ InternalRow row = recordIterator.next();
+ HoodieKey recordKey = new HoodieKey(
+ row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
+ row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD));
+ HoodieSparkRecord record = new HoodieSparkRecord(recordKey, row,
sparkSchema, false);
+ Option recordMetadata = record.getMetadata();
+ if (!partitionPath.equals(record.getPartitionPath())) {
+ HoodieUpsertException failureEx = new
HoodieUpsertException("mismatched partition path, record partition: "
+ + record.getPartitionPath() + " but trying to insert into
partition: " + partitionPath);
+ writeStatus.markFailure(record, failureEx, recordMetadata);
+ continue;
+ }
+ // Writes the record
+ try {
+ writeToFile(recordKey, record, writeSchemaWithMetaFields,
+ config.getPayloadConfig().getProps(), preserveMetadata);
+ writeStatus.markSuccess(record, recordMetadata);
+ } catch (Exception e) {
+ LOG.error("Error writing record " + record, e);
+ writeStatus.markFailure(record, e, recordMetadata);
+ }
+ }
+
+ // The stats of inserts, updates, and deletes are updated once at the
end
+ // These will be set in the write stat when closing the merge handle
+ HoodieReadStats stats = fileGroupReader.getStats();
+ this.insertRecordsWritten = stats.getNumInserts();
+ this.updatedRecordsWritten = stats.getNumUpdates();
+ this.recordsDeleted = stats.getNumDeletes();
+ this.recordsWritten = stats.getNumInserts() + stats.getNumUpdates();
+ }
+ } catch (IOException e) {
+ throw new HoodieUpsertException("Failed to compact file slice: " +
fileSlice, e);
+ }
+ }
+
+ /**
+ * Writes a single record to the new file.
+ *
+ * @param key record key
+ * @param record the record of {@link
HoodieSparkRecord}
+ * @param schema record schema
+ * @param prop table properties
+ * @param shouldPreserveRecordMetadata should preserve meta fields or not
+ *
+ * @throws IOException
+ */
+ protected void writeToFile(HoodieKey key, HoodieSparkRecord record, Schema
schema, Properties prop, boolean shouldPreserveRecordMetadata)
+ throws IOException {
+ // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point
to the
+ // file holding this record even in cases when overall metadata is
preserved
+ MetadataValues metadataValues = new
MetadataValues().setFileName(newFilePath.getName());
+ HoodieRecord populatedRecord = record.prependMetaFields(schema,
writeSchemaWithMetaFields, metadataValues, prop);
+
+ if (shouldPreserveRecordMetadata) {
+ fileWriter.write(key.getRecordKey(), populatedRecord,
writeSchemaWithMetaFields);
+ } else {
+ fileWriter.writeWithMetadata(key, populatedRecord,
writeSchemaWithMetaFields);
+ }
+ }
+
+ @Override
+ protected void writeIncomingRecords() {
+ // no operation.
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index eedc560bdaf..afb7cf7c72c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowParquetConfig;
import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport;
@@ -35,12 +36,16 @@ import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.io.OutputStream;
+import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE;
+
public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {
public HoodieSparkFileWriterFactory(HoodieStorage storage) {
@@ -51,6 +56,14 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
protected HoodieFileWriter newParquetFileWriter(
String instantTime, StoragePath path, HoodieConfig config, Schema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
+ String writeSchema = config.getStringOrDefault(
+ WRITE_SCHEMA_OVERRIDE, config.getString(AVRO_SCHEMA_STRING));
+ if (!StringUtils.isNullOrEmpty(writeSchema)) {
+ // The parquet write legacy format property needs to be overridden
+ // if there is a decimal field of small precision, to maintain the
compatibility.
+ HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(
+ config.getProps(), new Schema.Parser().parse(writeSchema));
+ }
boolean populateMetaFields =
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
String compressionCodecName =
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
// Support PARQUET_COMPRESSION_CODEC_NAME is ""
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 8848a2bb3c7..67a7658a309 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -33,6 +33,8 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -47,6 +49,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.HoodieSparkFileGroupReaderBasedMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -77,6 +80,7 @@ import
org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -264,6 +268,18 @@ public class HoodieSparkCopyOnWriteTable<T>
return Collections.singletonList(createHandle.close()).iterator();
}
+ @Override
+ public List<WriteStatus> compactUsingFileGroupReader(String instantTime,
+ CompactionOperation
operation,
+ HoodieReaderContext
readerContext,
+ Configuration conf) {
+ Option<BaseKeyGenerator> keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
+ HoodieSparkFileGroupReaderBasedMergeHandle mergeHandle = new
HoodieSparkFileGroupReaderBasedMergeHandle(config,
+ instantTime, this, operation, taskContextSupplier, keyGeneratorOpt,
readerContext, conf);
+ mergeHandle.write();
+ return mergeHandle.close();
+ }
+
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime) {
return new CleanActionExecutor<>(context, config, this, cleanInstantTime,
false).execute();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
new file mode 100644
index 00000000000..19b2b0aa241
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.execution.datasources.FileFormat;
+import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.util.SerializableConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ * Broadcast variable management for Spark.
+ */
+public class SparkBroadcastManager extends EngineBroadcastManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkBroadcastManager.class);
+
+ private final transient HoodieEngineContext context;
+
+ protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
+ protected Broadcast<SQLConf> sqlConfBroadcast;
+ protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
+ protected Broadcast<SerializableConfiguration> configurationBroadcast;
+
+ public SparkBroadcastManager(HoodieEngineContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void prepareAndBroadcast() {
+ if (!(context instanceof HoodieSparkEngineContext)) {
+ throw new HoodieIOException("Expected to be called using Engine's
context and not local context");
+ }
+
+ HoodieSparkEngineContext hoodieSparkEngineContext =
(HoodieSparkEngineContext) context;
+ SQLConf sqlConf =
hoodieSparkEngineContext.getSqlContext().sessionState().conf();
+ JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
+
+ boolean returningBatch = sqlConf.parquetVectorizedReaderEnabled();
+ scala.collection.immutable.Map<String, String> options =
+ scala.collection.immutable.Map$.MODULE$.<String, String>empty()
+ .$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(),
Boolean.toString(returningBatch)));
+
+ // Do broadcast.
+ sqlConfBroadcast = jsc.broadcast(sqlConf);
+ configurationBroadcast = jsc.broadcast(new
SerializableConfiguration(jsc.hadoopConfiguration()));
+ // Spark parquet reader has to be instantiated on the driver and broadcast
to the executors
+ parquetReaderOpt =
Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(
+ false, sqlConfBroadcast.getValue(), options,
configurationBroadcast.getValue().value()));
+ parquetReaderBroadcast = jsc.broadcast(parquetReaderOpt.get());
+ }
+
+ @Override
+ public Option<HoodieReaderContext>
retrieveFileGroupReaderContext(StoragePath basePath) {
+ if (parquetReaderBroadcast == null) {
+ throw new HoodieException("Spark Parquet reader broadcast is not
initialized.");
+ }
+
+ SparkParquetReader sparkParquetReader = parquetReaderBroadcast.getValue();
+ if (sparkParquetReader != null) {
+ List<Filter> filters = new ArrayList<>();
+ return Option.of(new SparkFileFormatInternalRowReaderContext(
+ sparkParquetReader,
+ JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
+ JavaConverters.asScalaBufferConverter(filters).asScala().toSeq()));
+ } else {
+ throw new HoodieException("Cannot get the broadcast Spark Parquet
reader.");
+ }
+ }
+
+ @Override
+ public Option<Configuration> retrieveStorageConfig() {
+ return Option.of(configurationBroadcast.getValue().value());
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index d47eb6d33aa..bac394a8da5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -27,8 +27,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.EngineBroadcastManager;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.SparkBroadcastManager;
import static
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
@@ -41,6 +44,11 @@ import static
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVE
public class HoodieSparkMergeOnReadTableCompactor<T>
extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
+ @Override
+ public Option<EngineBroadcastManager>
getEngineBroadcastManager(HoodieEngineContext context) {
+ return Option.of(new SparkBroadcastManager(context));
+ }
+
@Override
public void preCompact(
HoodieTable table, HoodieTimeline pendingCompactionTimeline,
WriteOperationType operationType, String instantTime) {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 00e14e06bf6..06d13e12a01 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -20,8 +20,8 @@
package org.apache.hudi
import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
getAppliedRequiredSchema}
-import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
+import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
@@ -45,7 +45,7 @@ import
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Sp
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField,
StructType}
-import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.mutable
@@ -59,12 +59,10 @@ import scala.collection.mutable
* @param parquetFileReader A reader that transforms a [[PartitionedFile]] to
an iterator of
* [[InternalRow]]. This is required for reading the
base file and
* not required for reading a file group with only
log files.
- * @param recordKeyColumn column name for the recordkey
* @param filters spark filters that might be pushed down into the
reader
* @param requiredFilters filters that are required and should always be
used, even in merging situations
*/
class SparkFileFormatInternalRowReaderContext(parquetFileReader:
SparkParquetReader,
- recordKeyColumn: String,
filters: Seq[Filter],
requiredFilters: Seq[Filter])
extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 72bd8950f01..06380d36419 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1488,4 +1488,31 @@ public class HoodieAvroUtils {
}
}
+ /**
+ * Returns whether the schema contains types not supported by the file group
reader.
+ * Right now only ENUM type in Avro has known issues.
+ *
+ * @param schema Avro schema
+ *
+ * @return whether the schema contains types not supported by the file group
reader.
+ */
+ public static boolean containsUnsupportedTypesForFileGroupReader(Schema
schema) {
+ switch (schema.getType()) {
+ case RECORD:
+ for (Field field : schema.getFields()) {
+ if (containsUnsupportedTypesForFileGroupReader(field.schema())) {
+ return true;
+ }
+ }
+ return false;
+ case ARRAY:
+ return
containsUnsupportedTypesForFileGroupReader(schema.getElementType());
+ case MAP:
+ return
containsUnsupportedTypesForFileGroupReader(schema.getValueType());
+ case UNION:
+ return
containsUnsupportedTypesForFileGroupReader(getActualSchemaFromUnion(schema,
null));
+ default:
+ return schema.getType() == Schema.Type.ENUM;
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index b16f9330292..8f5e7ebaa22 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -67,6 +67,10 @@ public abstract class HoodieEngineContext {
public abstract <T> HoodieData<T> emptyHoodieData();
+ public boolean supportsFileGroupReader() {
+ return false;
+ }
+
public <T> HoodieData<T> parallelize(List<T> data) {
if (data.isEmpty()) {
return emptyHoodieData();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 6d7ca6d5182..5a415847277 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.storage.StoragePath;
@@ -29,7 +30,7 @@ import java.util.Map;
/**
* Statistics about a single Hoodie write operation.
*/
-public class HoodieWriteStat implements Serializable {
+public class HoodieWriteStat extends HoodieReadStats {
public static final String NULL_COMMIT = "null";
@@ -55,25 +56,15 @@ public class HoodieWriteStat implements Serializable {
/**
* Total number of records written for this file. - for updates, its the
entire number of records in the file - for
- * inserts, its the actual number of records inserted.
+ * inserts, it's the actual number of records inserted.
*/
private long numWrites;
- /**
- * Total number of records deleted.
- */
- private long numDeletes;
-
/**
* Total number of records actually changed. (0 for inserts)
*/
private long numUpdateWrites;
- /**
- * Total number of insert records or converted to updates (for small file
handling).
- */
- private long numInserts;
-
/**
* Total number of bytes written.
*/
@@ -221,18 +212,10 @@ public class HoodieWriteStat implements Serializable {
return numWrites;
}
- public long getNumDeletes() {
- return numDeletes;
- }
-
public long getNumUpdateWrites() {
return numUpdateWrites;
}
- public long getNumInserts() {
- return numInserts;
- }
-
public String getFileId() {
return fileId;
}
@@ -418,6 +401,7 @@ public class HoodieWriteStat implements Serializable {
* The runtime stats for writing operation.
*/
public static class RuntimeStats implements Serializable {
+ private static final long serialVersionUID = 1L;
/**
* Total time taken to read and merge logblocks in a log file.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index e0014460916..e4e0299ec86 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -27,6 +27,7 @@ import
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -71,6 +72,8 @@ import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BAS
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
+import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
@@ -87,6 +90,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected final Option<String> payloadClass;
protected final TypedProperties props;
protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
+ protected final HoodieReadStats readerStats;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
protected T nextRecord;
@@ -123,6 +127,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
boolean isBitCaskDiskMapCompressionEnabled =
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+ this.readerStats = new HoodieReadStats();
try {
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -137,6 +142,11 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.baseFileIterator = baseFileIterator;
}
+ @Override
+ public HoodieReadStats getStats() {
+ return readerStats;
+ }
+
/**
* This allows hasNext() to be called multiple times without incrementing
the iterator by more than 1
* record. It does come with the caveat that hasNext() must be called every
time before next(). But
@@ -185,9 +195,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
* @return
* @throws IOException
*/
- protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T
record,
-
Map<String, Object> metadata,
-
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws
IOException {
+ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T record,
+
Map<String, Object> metadata,
+
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
+ throws IOException {
if (existingRecordMetadataPair != null) {
if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
@@ -212,7 +223,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
return Option.of(Pair.of(
- combinedRecord.getData(),
+ Option.ofNullable(combinedRecord.getData()),
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata,
combinedRecordAndSchema.getRight())));
}
return Option.empty();
@@ -230,7 +241,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
- return Option.of(Pair.of(record, metadata));
+ return Option.of(Pair.of(isDeleteRecord(Option.of(record),
(Schema) metadata.get(INTERNAL_META_SCHEMA))
+ ? Option.empty() : Option.of(record), metadata));
}
return Option.empty();
case CUSTOM:
@@ -245,7 +257,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
T combinedRecordData =
readerContext.convertAvroRecord((IndexedRecord)
combinedRecordAndSchemaOpt.get().getLeft().getData());
// If pre-combine does not return existing record, update it
if (combinedRecordData !=
existingRecordMetadataPair.getLeft().get()) {
- return Option.of(Pair.of(combinedRecordData, metadata));
+ return
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
}
}
@@ -268,7 +280,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
- return Option.of(Pair.of(combinedRecord.getData(), metadata));
+ return
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
}
return Option.empty();
@@ -280,7 +292,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into records(Map).
- return Option.of(Pair.of(record, metadata));
+ return Option.of(Pair.of(Option.ofNullable(record), metadata));
}
}
@@ -379,7 +391,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
Option<T> newer, Map<String, Object> newerInfoMap)
throws IOException {
if (!older.isPresent()) {
- return newer;
+ return isDeleteRecord(newer, (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
}
if (enablePartialMerging) {
@@ -401,22 +413,20 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return newer;
+ return isDeleteRecord(newer, (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
case EVENT_TIME_ORDERING:
- Comparable oldOrderingValue = readerContext.getOrderingValue(
- older, olderInfoMap, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
- if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) {
- return newer;
- }
Comparable newOrderingValue = readerContext.getOrderingValue(
newer, newerInfoMap, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
return Option.empty();
}
- if (oldOrderingValue.compareTo(newOrderingValue) > 0) {
- return older;
+ Comparable oldOrderingValue = readerContext.getOrderingValue(
+ older, olderInfoMap, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
+ if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
+ && oldOrderingValue.compareTo(newOrderingValue) > 0) {
+ return isDeleteRecord(older, (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : older;
}
- return newer;
+ return isDeleteRecord(newer, (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
case CUSTOM:
default:
if (payloadClass.isPresent()) {
@@ -495,14 +505,24 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
baseRecord, readerSchema);
- Option<T> resultRecord = logRecordInfo != null
- ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight())
- : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord),
metadata);
- if (resultRecord.isPresent()) {
- nextRecord = readerContext.seal(resultRecord.get());
- return true;
+ if (logRecordInfo != null) {
+ Option<T> resultRecord = merge(Option.of(baseRecord), metadata,
logRecordInfo.getLeft(), logRecordInfo.getRight());
+ if (resultRecord.isPresent()) {
+ // Updates
+ nextRecord = readerContext.seal(resultRecord.get());
+ readerStats.incrementNumUpdates();
+ return true;
+ } else {
+ // Deletes
+ readerStats.incrementNumDeletes();
+ return false;
+ }
}
- return false;
+
+ // Inserts
+ nextRecord = readerContext.seal(baseRecord);
+ readerStats.incrementNumInserts();
+ return true;
}
protected boolean hasNextLogRecord() throws IOException {
@@ -542,4 +562,18 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Comparable orderingValue) {
return rowOption.isEmpty() && orderingValue.equals(orderingFieldDefault);
}
+
+ private boolean isDeleteRecord(Option<T> record, Schema schema) {
+ if (record.isEmpty()) {
+ return true;
+ }
+
+ Object operation = readerContext.getValue(record.get(), schema,
OPERATION_METADATA_FIELD);
+ if (operation != null &&
HoodieOperation.isDeleteRecord(operation.toString())) {
+ return true;
+ }
+
+ Object deleteMarker = readerContext.getValue(record.get(), schema,
HOODIE_IS_DELETED_FIELD);
+ return deleteMarker instanceof Boolean && (boolean) deleteMarker;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index ac34efb0ab2..5bac955e22d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
@@ -236,6 +237,12 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
+ public HoodieReadStats getStats() {
+ ValidationUtils.checkArgument(recordBuffer != null,
+ "Only support getting reader stats from log merging now");
+ return recordBuffer.getStats();
+ }
+
/**
* @return The next record after calling {@link #hasNext}.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index d9ba8bcd90e..139c3529475 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -107,6 +107,11 @@ public interface HoodieFileGroupRecordBuffer<T> {
*/
void setBaseFileIterator(ClosableIterator<T> baseFileIterator);
+ /**
+ * @return statistics of log merging.
+ */
+ HoodieReadStats getStats();
+
/**
* Check if next merged record exists.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 82e976f4b27..97712fe6be7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -86,11 +86,14 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
@Override
public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable recordKey) throws IOException {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
- Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
+ Option<Pair<Option<T>, Map<String, Object>>> mergedRecordAndMetadata =
doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
+
if (mergedRecordAndMetadata.isPresent()) {
records.put(recordKey, Pair.of(
-
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
+ mergedRecordAndMetadata.get().getLeft().isPresent()
+ ?
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft().get()))
+ : Option.empty(),
mergedRecordAndMetadata.get().getRight()));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
new file mode 100644
index 00000000000..d3617dba499
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+
+/**
+ * Statistics about a single Hoodie read operation.
+ */
+@NotThreadSafe
+public class HoodieReadStats implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // Total number of insert records or converted to updates (for small file
handling)
+ protected long numInserts;
+ // Total number of updates
+ private long numUpdates = 0L;
+ // Total number of records deleted
+ protected long numDeletes;
+
+ public HoodieReadStats() {
+ }
+
+ public HoodieReadStats(long numInserts, long numUpdates, long numDeletes) {
+ this.numInserts = numInserts;
+ this.numUpdates = numUpdates;
+ this.numDeletes = numDeletes;
+ }
+
+ public long getNumInserts() {
+ return numInserts;
+ }
+
+ public long getNumUpdates() {
+ return numUpdates;
+ }
+
+ public long getNumDeletes() {
+ return numDeletes;
+ }
+
+ public void incrementNumInserts() {
+ numInserts++;
+ }
+
+ public void incrementNumUpdates() {
+ numUpdates++;
+ }
+
+ public void incrementNumDeletes() {
+ numDeletes++;
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index dd8d352f132..6c3e2a03c56 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -212,6 +212,9 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
String fieldName
) {
Schema.Field field = recordSchema.getField(fieldName);
+ if (field == null) {
+ return null;
+ }
int pos = field.pos();
return record.get(pos);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
index 277d0305c4e..c8c3a956f55 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -82,7 +83,11 @@ public class ObjectInspectorCache {
public Object getValue(ArrayWritable record, Schema schema, String
fieldName) {
ArrayWritableObjectInspector objectInspector = getObjectInspector(schema);
- return objectInspector.getStructFieldData(record,
objectInspector.getStructFieldRef(fieldName));
+ StructField structFieldRef = objectInspector.getStructFieldRef(fieldName);
+ if (structFieldRef == null) {
+ return null;
+ }
+ return objectInspector.getStructFieldData(record, structFieldRef);
}
public ArrayWritableObjectInspector getObjectInspector(Schema schema) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 84df68058bd..65d6d42b8ea 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieTableSchema, HoodieTableState, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
@@ -31,6 +29,8 @@ import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration,
HoodieHadoopStorage}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.PARQUET_VECTORIZED_READER_ENABLED
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils}
@@ -163,7 +162,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(filegroupName) match {
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty ||
fileSlice.getLogFiles.findAny().isPresent) =>
- val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value,
tableState.recordKeyField, filters, requiredFilters)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters,
requiredFilters)
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
val props = metaClient.getTableConfig.getProps
diff --git a/hudi-spark-datasource/hudi-spark/pom.xml
b/hudi-spark-datasource/hudi-spark/pom.xml
index c723b03ccc9..01b4c8d10d9 100644
--- a/hudi-spark-datasource/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark/pom.xml
@@ -304,6 +304,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${zk-curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
similarity index 95%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 9f0e7547304..ee8e8d6512d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.table.action.compact;
@@ -45,8 +46,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -237,8 +238,10 @@ public class TestAsyncCompaction extends
CompactionTestBase {
// validate the compaction plan does not include pending log files.
HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(
-
metaClient.reloadActiveTimeline().readCompactionPlanAsBytes(INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime)).get());
- assertTrue(compactionPlan.getOperations().stream().noneMatch(op ->
op.getDeltaFilePaths().stream().anyMatch(deltaFile ->
deltaFile.contains(pendingInstantTime))),
+ metaClient.reloadActiveTimeline()
+
.readCompactionPlanAsBytes(INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime)).get());
+ assertTrue(compactionPlan.getOperations().stream()
+ .noneMatch(op ->
op.getDeltaFilePaths().stream().anyMatch(deltaFile ->
deltaFile.contains(pendingInstantTime))),
"compaction plan should not include pending log files");
// execute inflight compaction.
@@ -329,11 +332,13 @@ public class TestAsyncCompaction extends
CompactionTestBase {
assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg),
"Compaction Instant can be scheduled with older timestamp");
// Schedule with timestamp same as that of committed instant
- assertNull(tryScheduleCompaction(secondInstantTime, client, cfg),
"Compaction Instant to be scheduled can have same timestamp as committed
instant");
+ assertNull(tryScheduleCompaction(secondInstantTime, client, cfg),
+ "Compaction Instant to be scheduled can have same timestamp as
committed instant");
final String compactionInstantTime2 = client.createNewInstantTime();
// Schedule compaction but do not run them
- assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg),
"Compaction Instant can be scheduled with greater timestamp");
+ assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg),
+ "Compaction Instant can be scheduled with greater timestamp");
}
@Test
@@ -407,7 +412,8 @@ public class TestAsyncCompaction extends CompactionTestBase
{
metaClient.reloadActiveTimeline();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
- assertEquals(compactionInstantTime,
pendingCompactionInstant.requestedTime(), "Pending Compaction instant has
expected instant time");
+ assertEquals(compactionInstantTime,
pendingCompactionInstant.requestedTime(),
+ "Pending Compaction instant has expected instant time");
Set<HoodieFileGroupId> fileGroupsBeforeReplace =
getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
// replace by using insertOverwrite
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
similarity index 97%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 29232732c15..c32d0c6739a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.table.action.compact;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
similarity index 99%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 552f2775494..f8686dff0e2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -80,8 +80,8 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -622,7 +622,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
.map(stat -> stat.getNumWrites() + stat.getNumUpdateWrites())
.reduce(0L, Long::sum);
}
-
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
similarity index 100%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index db3661cb648..2f6f3530213 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -87,7 +87,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
val reader = sparkAdapter.createParquetFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]))
val metaClient =
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build
val recordKeyField = metaClient.getTableConfig.getRecordKeyFields.get()(0)
- new SparkFileFormatInternalRowReaderContext(reader, recordKeyField,
Seq.empty, Seq.empty)
+ new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
}
override def commitToTable(recordList: util.List[HoodieRecord[_]],
operation: String, options: util.Map[String, String]): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
index e10476ed4de..9e9dd995156 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -92,7 +92,7 @@ class TestSpark35RecordPositionMetadataColumn extends
SparkClientFunctionalTestH
assertFalse(allBaseFiles.isEmpty)
val requiredSchema =
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema,
- new SparkFileFormatInternalRowReaderContext(reader, "userid",
Seq.empty, Seq.empty).supportsParquetRowIndex)
+ new SparkFileFormatInternalRowReaderContext(reader, Seq.empty,
Seq.empty).supportsParquetRowIndex)
// Confirm if the schema is as expected.
if (HoodieSparkUtils.gteqSpark3_5) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 293208d8ca6..053446fb7c2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -33,6 +33,7 @@ import
org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase,
import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JavaConversions
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
EqualTo, Expression, GreaterThan, Literal}
import org.apache.spark.sql.types.StringType
@@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import java.io.File
+
import scala.collection.JavaConverters._
class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {
@@ -145,7 +147,8 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
PRECOMBINE_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
- DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+ DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ HoodieWriteConfig.RECORD_MERGE_MODE.key -> "COMMIT_TIME_ORDERING"
) ++ metadataOpts
setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true)
val lastDf = dfList.last
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 9e0c933c01e..7dbe6f840fa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -19,22 +19,22 @@
package org.apache.hudi.functional
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
-import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
@@ -136,7 +136,6 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
@Test
def testMergeOnReadStorageDefaultCompaction(): Unit = {
- val preCombineField = "fare"
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -151,7 +150,6 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
)
var options: Map[String, String] = commonOpts
- options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() ->
preCombineField)
val dataGen = new HoodieTestDataGenerator(0xDEEF)
val fs = HadoopFSUtils.getFs(basePath,
spark.sparkContext.hadoopConfiguration)
// Bulk Insert Operation
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 82b6e0d23a9..be9b4354d8b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -17,25 +17,25 @@
package org.apache.spark.sql.hudi.dml
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
-import org.apache.hudi.common.function.SerializableFunctionUnchecked
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.log.HoodieLogFileReader
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
+import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig, SyncableFileSystemView}
-import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.common.util.CompactionUtils
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.metadata.HoodieTableMetadata
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.avro.Schema
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import java.util.function.Predicate
import java.util.{Collections, List, Optional}
import scala.collection.JavaConverters._
@@ -238,6 +238,25 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
if (tableType.equals("mor")) {
validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts",
"description")), true)
+
+ spark.sql(s"set
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 3")
+ // Partial updates that trigger compaction
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 2 as id, '_a2' as name, 18.0 as price, 1275 as ts
+ |union select 3 as id, '_a3' as name, 28.0 as price, 1280 as ts)
s0
+ |on t0.id = s0.id
+ |when matched then update set price = s0.price, _ts = s0.ts
+ |""".stripMargin)
+ validateCompactionExecuted(basePath)
+ checkAnswer(s"select id, name, price, _ts, description from
$tableName")(
+ Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
+ Seq(2, "a2", 18.0, 1275, "a2: updated desc2"),
+ Seq(3, "a3", 28.0, 1280, "a3: desc3")
+ )
+ spark.sql(s"set
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key}"
+ + s" =
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue()}")
}
if (tableType.equals("cow")) {
@@ -422,4 +441,13 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
assertEquals(expectedSchema, actualSchema)
}
}
+
+ def validateCompactionExecuted(basePath: String): Unit = {
+ val storageConf = HoodieTestUtils.getDefaultStorageConf
+ val metaClient: HoodieTableMetaClient =
+
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+ val lastCommit =
metaClient.getActiveTimeline.getCommitsTimeline.lastInstant().get()
+ assertEquals(HoodieTimeline.COMMIT_ACTION, lastCommit.getAction)
+ CompactionUtils.getCompactionPlan(metaClient, lastCommit.requestedTime())
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index d5a14cd4a4c..c1d8b16b27f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -104,6 +104,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
props.setProperty("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.InProcessLockProvider");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ props.setProperty("hoodie.merge.use.record.positions", "false");
UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, propsFilePath);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;