This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 66e11e5cabb [HUDI-6452] Add MOR snapshot reader to integrate with
query engines without using Hadoop APIs (#9066)
66e11e5cabb is described below
commit 66e11e5cabb85c43f7f3dffe25432a5edaec9d7d
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Jul 5 17:11:20 2023 +0530
[HUDI-6452] Add MOR snapshot reader to integrate with query engines without
using Hadoop APIs (#9066)
Add a new `HoodieMergeOnReadSnapshotReader` that implements
`Iterator<HoodieRecord>`. It merges the base Parquet data with
Avro data in log files.
---
.../table/log/AbstractHoodieLogRecordReader.java | 2 +-
.../table/log/HoodieMergedLogRecordScanner.java | 2 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 2 +-
.../realtime/AbstractRealtimeRecordReader.java | 14 +-
.../realtime/HoodieMergeOnReadSnapshotReader.java | 216 +++++++++++++++++++++
.../HoodieMergeOnReadTableInputFormat.java | 19 +-
.../hadoop/realtime/HoodieRealtimeFileSplit.java | 14 +-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 39 ++--
.../utils/HoodieRealtimeRecordReaderUtils.java | 11 +-
.../TestHoodieMergeOnReadSnapshotReader.java | 176 +++++++++++++++++
10 files changed, 454 insertions(+), 41 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 1e5bef103f4..78e701a07d4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -641,7 +641,7 @@ public abstract class AbstractHoodieLogRecordReader {
*
* @param hoodieRecord Hoodie Record to process
*/
- protected abstract <T> void processNextRecord(HoodieRecord<T> hoodieRecord)
throws Exception;
+ public abstract <T> void processNextRecord(HoodieRecord<T> hoodieRecord)
throws Exception;
/**
* Process next deleted record.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index c150ba9a3e6..7b98b4cc35e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -236,7 +236,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
}
@Override
- protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
+ public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
String key = newRecord.getRecordKey();
HoodieRecord<T> prevRecord = records.get(key);
if (prevRecord != null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 51044c0814c..f62ec0febd5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -69,7 +69,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
}
@Override
- protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws
Exception {
+ public <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws
Exception {
// NOTE: Record have to be cloned here to make sure if it holds low-level
engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into queue of
BoundedInMemoryExecutor.
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 85d4a92311b..04a05a1d6f0 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -53,6 +53,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+
/**
* Record Reader implementation to merge fresh avro data with base parquet
data, to support real time queries.
*/
@@ -67,10 +69,10 @@ public abstract class AbstractRealtimeRecordReader {
private Schema readerSchema;
private Schema writerSchema;
private Schema hiveSchema;
- private HoodieTableMetaClient metaClient;
+ private final HoodieTableMetaClient metaClient;
protected SchemaEvolutionContext schemaEvolutionContext;
// support merge operation
- protected boolean supportPayload = true;
+ protected boolean supportPayload;
// handle hive type to avro record
protected HiveAvroSerializer serializer;
private boolean supportTimestamp;
@@ -149,11 +151,11 @@ public abstract class AbstractRealtimeRecordReader {
partitionFields.length() > 0 ?
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema =
HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema,
partitioningFields);
- List<String> projectionFields =
HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
- jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
partitioningFields);
+ List<String> projectionFields =
HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
EMPTY_STRING),
+ jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
EMPTY_STRING), partitioningFields);
Map<String, Field> schemaFieldsMap =
HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
- hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap,
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS));
+ hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap,
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
// TODO(vc): In the future, the reader schema should be updated based on
log files & be able
// to null out fields not present before
@@ -166,7 +168,7 @@ public abstract class AbstractRealtimeRecordReader {
}
public Schema constructHiveOrderedSchema(Schema writerSchema, Map<String,
Field> schemaFieldsMap, String hiveColumnString) {
- String[] hiveColumns = hiveColumnString.split(",");
+ String[] hiveColumns = hiveColumnString.isEmpty() ? new String[0] :
hiveColumnString.split(",");
LOG.info("Hive Columns : " + hiveColumnString);
List<Field> hiveSchemaFields = new ArrayList<>();
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
new file mode 100644
index 00000000000..1cc8bf91b25
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP;
+import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP;
+import static
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getBaseFileReader;
+import static
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
+import static
org.apache.hudi.internal.schema.InternalSchema.getEmptyInternalSchema;
+
+/**
+ * An implementation of {@link AbstractRealtimeRecordReader} that reads from
base parquet files and log files,
+ * and merges the records on the fly. It differs from {@link
HoodieRealtimeRecordReader} in that it does not
+ * implement Hadoop's RecordReader interface, and instead implements Iterator
interface that returns an iterator
+ * of {@link HoodieRecord}s which are {@link HoodieAvroIndexedRecord}s. This
can be used by query engines like
+ * Trino that do not use Hadoop's RecordReader interface. However, the engine
must support reading from iterators
+ * and also support Avro (de)serialization.
+ */
+public class HoodieMergeOnReadSnapshotReader extends
AbstractRealtimeRecordReader implements Iterator<HoodieRecord>, AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);
+
+ private final String tableBasePath;
+ private final List<HoodieLogFile> logFilePaths;
+ private final String latestInstantTime;
+ private final Schema readerSchema;
+ private final JobConf jobConf;
+ private final HoodieMergedLogRecordScanner logRecordScanner;
+ private final HoodieFileReader baseFileReader;
+ private final Map<String, HoodieRecord> logRecordsByKey;
+ private final Iterator<HoodieRecord> recordsIterator;
+ private final ExternalSpillableMap<String, HoodieRecord> mergedRecordsByKey;
+
+ /**
+ * In order to instantiate this record reader, one needs to provide
following parameters.
+ * An example usage is demonstrated in TestHoodieMergeOnReadSnapshotReader.
+ *
+ * @param tableBasePath Base path of the Hudi table
+ * @param baseFilePath Path of the base file as of the latest instant
time for the split being processed
+ * @param logFilePaths Paths of the log files as of the latest file
slices pertaining to file group id of the base file
+ * @param latestInstantTime Latest instant time
+ * @param readerSchema Schema of the reader
+ * @param jobConf Any job configuration
+ * @param start Start offset
+ * @param length Length of the split
+ */
+ public HoodieMergeOnReadSnapshotReader(String tableBasePath,
+ String baseFilePath,
+ List<HoodieLogFile> logFilePaths,
+ String latestInstantTime,
+ Schema readerSchema,
+ JobConf jobConf,
+ long start,
+ long length) throws IOException {
+ super(getRealtimeSplit(tableBasePath, baseFilePath, logFilePaths,
latestInstantTime, start, length, new String[0]), jobConf);
+ this.tableBasePath = tableBasePath;
+ this.logFilePaths = logFilePaths;
+ this.latestInstantTime = latestInstantTime;
+ this.readerSchema = readerSchema;
+ this.jobConf = jobConf;
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ this.logRecordScanner = getMergedLogRecordScanner();
+ LOG.debug("Time taken to scan log records: {}", timer.endTimer());
+ this.baseFileReader = getBaseFileReader(new Path(baseFilePath), jobConf);
+ this.logRecordsByKey = logRecordScanner.getRecords();
+ Set<String> logRecordKeys = new HashSet<>(this.logRecordsByKey.keySet());
+ this.mergedRecordsByKey = new ExternalSpillableMap<>(
+ getMaxCompactionMemoryInBytes(jobConf),
+ jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP,
DEFAULT_SPILLABLE_MAP_BASE_PATH),
+ new DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(readerSchema),
+ jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()),
+ jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));
+ try (ClosableIterator<String> baseFileIterator =
baseFileReader.getRecordKeyIterator()) {
+ timer.startTimer();
+ while (baseFileIterator.hasNext()) {
+ String key = baseFileIterator.next();
+ if (logRecordKeys.contains(key)) {
+ logRecordKeys.remove(key);
+ Option<HoodieAvroIndexedRecord> mergedRecord =
buildGenericRecordWithCustomPayload(logRecordsByKey.get(key));
+ if (mergedRecord.isPresent()) {
+ HoodieRecord hoodieRecord = mergedRecord.get().copy();
+ mergedRecordsByKey.put(key, hoodieRecord);
+ }
+ }
+ }
+ }
+ LOG.debug("Time taken to merge base file and log file records: {}",
timer.endTimer());
+ this.recordsIterator = mergedRecordsByKey.values().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return recordsIterator.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ return recordsIterator.next();
+ }
+
+ public Map<String, HoodieRecord> getRecordsByKey() {
+ return mergedRecordsByKey;
+ }
+
+ public Iterator<HoodieRecord> getRecordsIterator() {
+ return recordsIterator;
+ }
+
+ public Map<String, HoodieRecord> getLogRecordsByKey() {
+ return logRecordsByKey;
+ }
+
+ private static HoodieRealtimeFileSplit getRealtimeSplit(String
tableBasePath, String baseFilePath,
+ List<HoodieLogFile>
logFilePaths,
+ String
latestInstantTime,
+ long start, long
length, String[] hosts) {
+ HoodieRealtimePath realtimePath = new HoodieRealtimePath(
+ new Path(baseFilePath).getParent(),
+ baseFilePath,
+ tableBasePath,
+ logFilePaths,
+ latestInstantTime,
+ false, // TODO: Fix this to support incremental queries
+ Option.empty());
+ return HoodieInputFormatUtils.createRealtimeFileSplit(realtimePath, start,
length, hosts);
+ }
+
+ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
+ return HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
+ .withBasePath(tableBasePath)
+ .withLogFilePaths(logFilePaths.stream().map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList()))
+ .withReaderSchema(readerSchema)
+ .withLatestInstantTime(latestInstantTime)
+ .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
+
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReverseReader(false)
+ .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP,
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP,
DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ .withDiskMapType(jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
+
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
false))
+
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(getEmptyInternalSchema()))
+ .build();
+ }
+
+ private Option<HoodieAvroIndexedRecord>
buildGenericRecordWithCustomPayload(HoodieRecord record) throws IOException {
+ if (usesCustomPayload) {
+ return record.toIndexedRecord(getWriterSchema(), payloadProps);
+ } else {
+ return record.toIndexedRecord(readerSchema, payloadProps);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (baseFileReader != null) {
+ baseFileReader.close();
+ }
+ if (logRecordScanner != null) {
+ logRecordScanner.close();
+ }
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 51ff0c4c8f6..a5f24954c09 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -65,6 +65,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.createRealtimeFileSplit;
/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for
reading of Hudi's
@@ -304,20 +305,12 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
.collect(Collectors.toList());
}
- private static HoodieRealtimeFileSplit
createRealtimeFileSplit(HoodieRealtimePath path, long start, long length,
String[] hosts) {
- try {
- return new HoodieRealtimeFileSplit(new FileSplit(path, start, length,
hosts), path);
- } catch (IOException e) {
- throw new HoodieIOException(String.format("Failed to create instance of
%s", HoodieRealtimeFileSplit.class.getName()), e);
- }
- }
-
private static HoodieRealtimeBootstrapBaseFileSplit
createRealtimeBootstrapBaseFileSplit(BootstrapBaseFileSplit split,
-
String basePath,
-
List<HoodieLogFile> logFiles,
-
String maxInstantTime,
-
boolean belongsToIncrementalQuery,
-
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
+
String basePath,
+
List<HoodieLogFile> logFiles,
+
String maxInstantTime,
+
boolean belongsToIncrementalQuery,
+
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
try {
String[] hosts = split.getLocationInfo() != null ?
Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
index a424f021c2d..b45f91084c7 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
@@ -18,10 +18,11 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
+import org.apache.hadoop.mapred.FileSplit;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -34,9 +35,9 @@ import java.util.List;
* <li>Split corresponding to the base file</li>
* <li>List of {@link HoodieLogFile} that holds the delta to be merged (upon
reading)</li>
* </ol>
- *
+ * <p>
* This split is correspondent to a single file-slice in the Hudi terminology.
- *
+ * <p>
* NOTE: If you're adding fields here you need to make sure that you
appropriately de-/serialize them
* in {@link #readFromInput(DataInput)} and {@link
#writeToOutput(DataOutput)}
*/
@@ -63,11 +64,10 @@ public class HoodieRealtimeFileSplit extends FileSplit
implements RealtimeSplit
*/
private Option<HoodieVirtualKeyInfo> virtualKeyInfo = Option.empty();
- public HoodieRealtimeFileSplit() {}
+ public HoodieRealtimeFileSplit() {
+ }
- public HoodieRealtimeFileSplit(FileSplit baseSplit,
- HoodieRealtimePath path)
- throws IOException {
+ public HoodieRealtimeFileSplit(FileSplit baseSplit, HoodieRealtimePath path)
throws IOException {
this(baseSplit,
path.getBasePath(),
path.getDeltaLogFiles(),
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 70e8f49ca51..c3984c5d171 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -41,6 +41,8 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.slf4j.Logger;
@@ -187,16 +190,17 @@ public class HoodieInputFormatUtils {
/**
* Filter any specific instants that we do not want to process.
* example timeline:
- *
+ * <p>
* t0 -> create bucket1.parquet
* t1 -> create and append updates bucket1.log
* t2 -> request compaction
* t3 -> create bucket2.parquet
- *
+ * <p>
* if compaction at t2 takes a long time, incremental readers on RO tables
can move to t3 and would skip updates in t1
- *
+ * <p>
* To workaround this problem, we want to stop returning data belonging to
commits > t2.
* After compaction is complete, incremental reader would see updates in t2,
t3, so on.
+ *
* @param timeline
* @return
*/
@@ -220,6 +224,7 @@ public class HoodieInputFormatUtils {
/**
* Extract partitions touched by the commitsToCheck.
+ *
* @param commitsToCheck
* @param tableMetaClient
* @param timeline
@@ -228,9 +233,9 @@ public class HoodieInputFormatUtils {
* @throws IOException
*/
public static Option<String> getAffectedPartitions(List<HoodieInstant>
commitsToCheck,
- HoodieTableMetaClient tableMetaClient,
- HoodieTimeline timeline,
- List<Path> inputPaths) throws
IOException {
+ HoodieTableMetaClient
tableMetaClient,
+ HoodieTimeline timeline,
+ List<Path> inputPaths)
throws IOException {
Set<String> partitionsToList = new HashSet<>();
for (HoodieInstant commit : commitsToCheck) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
@@ -274,6 +279,7 @@ public class HoodieInputFormatUtils {
/**
* Extract HoodieTimeline based on HoodieTableMetaClient.
+ *
* @param job
* @param tableMetaClient
* @return
@@ -298,6 +304,7 @@ public class HoodieInputFormatUtils {
/**
* Get commits for incremental query from Hive map reduce configuration.
+ *
* @param job
* @param tableName
* @param timeline
@@ -325,6 +332,7 @@ public class HoodieInputFormatUtils {
/**
* Extract HoodieTableMetaClient by partition path.
+ *
* @param conf The hadoop conf
* @param partitions The partitions
* @return partition path to table meta client mapping
@@ -371,7 +379,7 @@ public class HoodieInputFormatUtils {
public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws
IOException {
if (baseFile.getBootstrapBaseFile().isPresent()) {
if (baseFile.getFileStatus() instanceof LocatedFileStatus) {
- return new
LocatedFileStatusWithBootstrapBaseFile((LocatedFileStatus)baseFile.getFileStatus(),
+ return new LocatedFileStatusWithBootstrapBaseFile((LocatedFileStatus)
baseFile.getFileStatus(),
baseFile.getBootstrapBaseFile().get().getFileStatus());
} else {
return new FileStatusWithBootstrapBaseFile(baseFile.getFileStatus(),
@@ -383,6 +391,7 @@ public class HoodieInputFormatUtils {
/**
* Filter a list of FileStatus based on commitsToCheck for incremental view.
+ *
* @param job
* @param tableMetaClient
* @param timeline
@@ -391,7 +400,7 @@ public class HoodieInputFormatUtils {
* @return
*/
public static List<FileStatus> filterIncrementalFileStatus(Job job,
HoodieTableMetaClient tableMetaClient,
- HoodieTimeline timeline, FileStatus[] fileStatuses, List<HoodieInstant>
commitsToCheck) throws IOException {
+ HoodieTimeline
timeline, FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) throws
IOException {
TableFileSystemView.BaseFileOnlyView roView = new
HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
List<String> commitsList =
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
List<HoodieBaseFile> filteredFiles =
roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
@@ -452,7 +461,7 @@ public class HoodieInputFormatUtils {
for (Path path : snapshotPaths) {
// Find meta client associated with the input path
metaClientList.stream().filter(metaClient ->
path.toString().contains(metaClient.getBasePath()))
- .forEach(metaClient -> grouped.get(metaClient).add(path));
+ .forEach(metaClient -> grouped.get(metaClient).add(path));
}
return grouped;
}
@@ -467,6 +476,7 @@ public class HoodieInputFormatUtils {
* Checks the file status for a race condition which can set the file size
to 0. 1. HiveInputFormat does
* super.listStatus() and gets back a FileStatus[] 2. Then it creates the
HoodieTableMetaClient for the paths listed.
* 3. Generation of splits looks at FileStatus size to create splits, which
skips this file
+ *
* @param conf
* @param dataFile
* @return
@@ -493,14 +503,13 @@ public class HoodieInputFormatUtils {
*
* @param basePath The table base path
* @param metadataList The metadata list to read the data from
- *
* @return the affected file status array
*/
public static FileStatus[] listAffectedFilesForCommits(Configuration
hadoopConf, Path basePath, List<HoodieCommitMetadata> metadataList) {
// TODO: Use HoodieMetaTable to extract affected file directly.
HashMap<String, FileStatus> fullPathToFileStatus = new HashMap<>();
// Iterate through the given commits.
- for (HoodieCommitMetadata metadata: metadataList) {
+ for (HoodieCommitMetadata metadata : metadataList) {
fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf,
basePath.toString()));
}
return fullPathToFileStatus.values().toArray(new FileStatus[0]);
@@ -518,4 +527,12 @@ public class HoodieInputFormatUtils {
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
+
+ public static HoodieRealtimeFileSplit
createRealtimeFileSplit(HoodieRealtimePath path, long start, long length,
String[] hosts) {
+ try {
+ return new HoodieRealtimeFileSplit(new FileSplit(path, start, length,
hosts), path);
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to create instance of
%s", HoodieRealtimeFileSplit.class.getName()), e);
+ }
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 6d010e12cdd..a6d1cf66acb 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,9 +18,12 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
@@ -30,6 +33,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -46,6 +50,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -158,7 +163,7 @@ public class HoodieRealtimeRecordReaderUtils {
case STRING:
return new Text(value.toString());
case BYTES:
- return new BytesWritable(((ByteBuffer)value).array());
+ return new BytesWritable(((ByteBuffer) value).array());
case INT:
if (schema.getLogicalType() != null &&
schema.getLogicalType().getName().equals("date")) {
return HoodieHiveUtils.getDateWriteable((Integer) value);
@@ -297,6 +302,10 @@ public class HoodieRealtimeRecordReaderUtils {
return appendNullSchemaFields(schema, fieldsToAdd);
}
+ public static HoodieFileReader getBaseFileReader(Path path, JobConf conf)
throws IOException {
+ return
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(conf,
path);
+ }
+
private static Schema appendNullSchemaFields(Schema schema, List<String>
newFieldNames) {
List<Schema.Field> newFields = new ArrayList<>();
for (String newField : newFieldNames) {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
new file mode 100644
index 00000000000..b37b4170a0c
--- /dev/null
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.fs.FSUtils.getFs;
+import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static
org.apache.hudi.hadoop.testutils.InputFormatTestUtil.writeDataBlockToLogFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieMergeOnReadSnapshotReader {
+
+ private static final int TOTAL_RECORDS = 100;
+ private static final String FILE_ID = "fileid0";
+ private JobConf baseJobConf;
+ private FileSystem fs;
+ private Configuration hadoopConf;
+
+ @TempDir
+ public java.nio.file.Path basePath;
+
+ @BeforeEach
+ public void setUp() {
+ hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+ hadoopConf.set("fs.defaultFS", "file:///");
+ hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
+ baseJobConf = new JobConf(hadoopConf);
+ baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
String.valueOf(1024 * 1024));
+ fs = getFs(basePath.toUri().toString(), baseJobConf);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.delete(new Path(basePath.toString()), true);
+ fs.close();
+ }
+ }
+
+ @Test
+ public void testSnapshotReader() throws Exception {
+ testReaderInternal(false,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+ }
+
+ @Test
+ public void testSnapshotReaderPartitioned() throws Exception {
+ testReaderInternal(true,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+ }
+
+ private void testReaderInternal(boolean partitioned,
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
+ // initial commit
+ Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+ HoodieTestUtils.init(hadoopConf, basePath.toString(),
HoodieTableType.MERGE_ON_READ);
+ String baseInstant = "100";
+ File partitionDir = partitioned ?
InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, TOTAL_RECORDS,
baseInstant,
+ HoodieTableType.MERGE_ON_READ)
+ : InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath,
schema, 1, TOTAL_RECORDS, baseInstant,
+ HoodieTableType.MERGE_ON_READ);
+
+ HoodieCommitMetadata commitMetadata =
CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT,
+ schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
+ FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant,
commitMetadata);
+ // Add the paths
+ FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
+
+ List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
+ logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
+ logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
+ String baseFilePath = partitionDir + "/" + FILE_ID + "_1-0-1_" +
baseInstant + ".parquet";
+ String partitionPath = partitioned ? getRelativePartitionPath(new
Path(basePath.toString()), new Path(partitionDir.getAbsolutePath())) :
"default";
+ FileSlice fileSlice = new FileSlice(
+ new HoodieFileGroupId(partitionPath, FILE_ID),
+ baseInstant,
+ new HoodieBaseFile(fs.getFileStatus(new Path(baseFilePath))),
+ new ArrayList<>());
+ logVersionsWithAction.forEach(logVersionWithAction -> {
+ try {
+ // update files or generate new log file
+ int logVersion = logVersionWithAction.getRight();
+ String action = logVersionWithAction.getKey();
+ int baseInstantTs = Integer.parseInt(baseInstant);
+ String instantTime = String.valueOf(baseInstantTs + logVersion);
+ String latestInstant =
+ action.equals(HoodieTimeline.ROLLBACK_ACTION) ?
String.valueOf(baseInstantTs + logVersion - 2)
+ : instantTime;
+
+ HoodieLogFormat.Writer writer = writeDataBlockToLogFile(
+ partitionDir,
+ fs,
+ schema,
+ FILE_ID,
+ baseInstant,
+ latestInstant,
+ 120,
+ 0,
+ logVersion,
+ logBlockType);
+ long size = writer.getCurrentSize();
+ writer.close();
+ assertTrue(size > 0, "block - size should be > 0");
+ FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime,
commitMetadata);
+ fileSlice.addLogFile(writer.getLogFile());
+
+ HoodieMergeOnReadSnapshotReader snapshotReader = new
HoodieMergeOnReadSnapshotReader(
+ basePath.toString(),
+ fileSlice.getBaseFile().get().getPath(),
+ fileSlice.getLogFiles().collect(Collectors.toList()),
+ latestInstant,
+ schema,
+ baseJobConf,
+ 0,
+ size);
+ Map<String, HoodieRecord> records = snapshotReader.getRecordsByKey();
+ assertEquals(TOTAL_RECORDS, records.size());
+ snapshotReader.close();
+ } catch (Exception ioe) {
+ throw new HoodieException(ioe.getMessage(), ioe);
+ }
+ });
+ }
+}