This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 671bc9f10d75 [HUDI-9786] Handle getProgress for log files only splits
in Record Reader (#13855)
671bc9f10d75 is described below
commit 671bc9f10d751bd0aafa2798ecaa62921340f353
Author: vamsikarnika <[email protected]>
AuthorDate: Wed Sep 10 05:35:32 2025 +0530
[HUDI-9786] Handle getProgress for log files only splits in Record Reader
(#13855)
Co-authored-by: Vamsi <[email protected]>
---
.../hudi/hadoop/HiveHoodieReaderContext.java | 8 +---
.../HoodieFileGroupReaderBasedRecordReader.java | 38 ++++++++++++++--
.../hudi/hadoop/TestHiveHoodieReaderContext.java | 19 --------
...TestHoodieFileGroupReaderBasedRecordReader.java | 51 ++++++++++++++++++++++
4 files changed, 88 insertions(+), 28 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 03ea8a9f77f2..5e282accad8e 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -229,17 +229,13 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
if (firstRecordReader != null) {
return firstRecordReader.getPos();
}
- // for log only split, firstRecordReader is not initialized
- // we return 0 to make it consistent with non HoodieFileGroupBased Record
Readers
- return 0;
+ throw new IllegalStateException("getProgress() should not be called before
a record reader has been initialized");
}
public float getProgress() throws IOException {
if (firstRecordReader != null) {
return firstRecordReader.getProgress();
}
- // for log only split, firstRecordReader is not initialized
- // we return 0 to make it consistent with non HoodieFileGroupBased Record
Readers
- return 0;
+ throw new IllegalStateException("getProgress() should not be called before
a record reader has been initialized");
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index 02bbdb11cdd2..450bb056d6bd 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -104,6 +104,7 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
private final InputSplit inputSplit;
private final JobConf jobConfCopy;
private final UnaryOperator<ArrayWritable> reverseProjection;
+ private final boolean containsBaseFile;
public HoodieFileGroupReaderBasedRecordReader(HiveReaderCreator
readerCreator,
final InputSplit split,
@@ -141,11 +142,13 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
}
}
LOG.debug("Creating HoodieFileGroupReaderRecordReader with
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath,
latestCommitTime, fileSplit.getPath());
+ FileSlice fileSlice = getFileSliceFromSplit(fileSplit,
getFs(tableBasePath, jobConfCopy), tableBasePath);
+ this.containsBaseFile = fileSlice.getBaseFile().isPresent();
this.recordIterator = HoodieFileGroupReader.<ArrayWritable>newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(latestCommitTime)
- .withFileSlice(getFileSliceFromSplit(fileSplit, getFs(tableBasePath,
jobConfCopy), tableBasePath))
+ .withFileSlice(fileSlice)
.withDataSchema(tableSchema)
.withRequestedSchema(requestedSchema)
.withProps(props)
@@ -161,6 +164,25 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
this.reverseProjection =
HoodieArrayWritableAvroUtils.getReverseProjection(requestedSchema,
outputSchema);
}
+ @VisibleForTesting
+ HoodieFileGroupReaderBasedRecordReader(
+ HiveHoodieReaderContext readerContext,
+ ClosableIterator<ArrayWritable> recordIterator,
+ ArrayWritable arrayWritable,
+ InputSplit inputSplit,
+ JobConf jobConf,
+ UnaryOperator<ArrayWritable> reverseProjection,
+ boolean containsBaseFile
+ ) {
+ this.readerContext = readerContext;
+ this.recordIterator = recordIterator;
+ this.arrayWritable = arrayWritable;
+ this.inputSplit = inputSplit;
+ this.jobConfCopy = jobConf;
+ this.reverseProjection = reverseProjection;
+ this.containsBaseFile = containsBaseFile;
+ }
+
@Override
public boolean next(NullWritable key, ArrayWritable value) throws
IOException {
if (!recordIterator.hasNext()) {
@@ -183,7 +205,12 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
@Override
public long getPos() throws IOException {
- return readerContext.getPos();
+ if (this.containsBaseFile) {
+ return readerContext.getPos();
+ }
+
+ // for log only split, we return 0 to make it consistent with non
HoodieFileGroupReader based Record Readers
+ return 0;
}
@Override
@@ -193,7 +220,12 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
@Override
public float getProgress() throws IOException {
- return readerContext.getProgress();
+ if (this.containsBaseFile) {
+ return readerContext.getProgress();
+ }
+
+ // for log only split, we return 0 to make it consistent with non
HoodieFileGroupReader based Record Readers
+ return 0;
}
/**
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java
index 009ef570cb89..eba23a02d302 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -144,24 +143,6 @@ class TestHiveHoodieReaderContext {
assertTrue(((BooleanWritable) values[2]).get());
}
- @Test
- void testGetProgressWithRecordReaderNotInitialized() throws IOException {
- when(tableConfig.populateMetaFields()).thenReturn(true);
- HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(
- readerCreator, Collections.emptyList(), storageConfiguration,
tableConfig);
-
- assertEquals(0, avroReaderContext.getProgress());
- }
-
- @Test
- void testGetPosWithRecordReaderNotInitialized() throws IOException {
- when(tableConfig.populateMetaFields()).thenReturn(true);
- HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(
- readerCreator, Collections.emptyList(), storageConfiguration,
tableConfig);
-
- assertEquals(0, avroReaderContext.getPos());
- }
-
private static Schema getBaseSchema() {
Schema baseDataSchema = Schema.createRecord("test", null, null, false);
Schema.Field baseField1 = new Schema.Field("field_1",
Schema.create(Schema.Type.STRING));
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderBasedRecordReader.java
new file mode 100644
index 000000000000..07fc8ca2fd7a
--- /dev/null
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderBasedRecordReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.UnaryOperator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestHoodieFileGroupReaderBasedRecordReader {
+
+ @Test
+ void testGetProgressAndPosForLogFilesOnlyInMergeOnReadTable() throws
Exception {
+ HoodieFileGroupReaderBasedRecordReader recordReader = new
HoodieFileGroupReaderBasedRecordReader(
+ mock(HiveHoodieReaderContext.class),
+ mock(ClosableIterator.class),
+ mock(ArrayWritable.class),
+ mock(InputSplit.class),
+ mock(JobConf.class),
+ mock(UnaryOperator.class),
+ false
+ );
+
+ assertEquals(0, recordReader.getProgress());
+ assertEquals(0, recordReader.getPos());
+ }
+}