yihua commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3036286607
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala:
##########
@@ -37,15 +39,23 @@ class TestSqlStatement extends HoodieSparkSqlTestBase {
val STATE_FINISH_ALL = 12
test("Test Sql Statements") {
- Seq("cow", "mor").foreach { tableType =>
- withTempDir { tmp =>
- val params = Map(
- "tableType" -> tableType,
- "tmpDir" -> {
- tmp.getCanonicalPath.replace('\\', '/')
- }
- )
- execSqlFile("/sql-statements.sql", params)
+ val baseFileFormats = if (HoodieSparkUtils.gteqSpark3_4) Seq("parquet",
"lance") else Seq("parquet")
+ baseFileFormats.foreach { baseFileFormat =>
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val params = Map(
+ "tableType" -> tableType,
+ "baseFileFormat" -> baseFileFormat,
+ "recordMergerImpl" -> (
+ if (baseFileFormat.equals("parquet")) "" else {
+ s"hoodie.write.record.merge.custom.implementation.classes =
'${classOf[DefaultSparkRecordMerger].getName}',"
Review Comment:
🤖 nit: same as above — `baseFileFormat == "parquet"` is idiomatic Scala;
`.equals("parquet")` looks like Java style slipping in here.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -216,11 +241,19 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
tableType
}
+ val recordMergerImpl = if (baseFileFormat.equals("lance")) {
Review Comment:
🤖 nit: in Scala, `==` is the idiomatic way to compare strings —
`baseFileFormat == "lance"` reads more naturally than `.equals("lance")`.
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
+public class HoodieLanceRecordReader implements RecordReader<NullWritable,
ArrayWritable> {
+
+ private long count = 0;
+ private final ArrayWritable valueObj;
+ private HoodieFileReader reader;
+ private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
+ private final HoodieSchema schema;
+
+ public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf
job) throws IOException {
+ FileSplit fileSplit = (FileSplit) split;
+ StoragePath path = convertToStoragePath(fileSplit.getPath());
+ StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+ HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+ reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path,
storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE,
Option.empty());
Review Comment:
🤖 nit: this chain is pretty long for one line — it might be worth breaking
it into a local variable for the IO factory or reader factory to make it easier
to scan.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala:
##########
@@ -37,15 +39,23 @@ class TestSqlStatement extends HoodieSparkSqlTestBase {
val STATE_FINISH_ALL = 12
test("Test Sql Statements") {
- Seq("cow", "mor").foreach { tableType =>
- withTempDir { tmp =>
- val params = Map(
- "tableType" -> tableType,
- "tmpDir" -> {
- tmp.getCanonicalPath.replace('\\', '/')
- }
- )
- execSqlFile("/sql-statements.sql", params)
+ val baseFileFormats = if (HoodieSparkUtils.gteqSpark3_4) Seq("parquet",
"lance") else Seq("parquet")
+ baseFileFormats.foreach { baseFileFormat =>
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val params = Map(
+ "tableType" -> tableType,
+ "baseFileFormat" -> baseFileFormat,
+ "recordMergerImpl" -> (
Review Comment:
🤖 nit: the multi-line `if/else` expression for `recordMergerImpl` embedded
inline inside the `Map(...)` literal is a bit hard to parse at a glance — could
you assign it to a `val` before the `Map`, similar to how
`TestSparkSqlCoreFlow` handles it?
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java:
##########
@@ -49,7 +49,10 @@ public static <T, I, K, O> HoodieFileWriter getFileWriter(
String instantTime, StoragePath path, HoodieStorage storage,
HoodieConfig config, HoodieSchema schema,
TaskContextSupplier taskContextSupplier, HoodieRecordType recordType)
throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
- HoodieFileWriterFactory factory =
HoodieIOFactory.getIOFactory(storage).getWriterFactory(recordType);
+ HoodieFileFormat format =
HoodieFileFormat.fromFileExtensionOrNull(extension);
+ HoodieRecordType fixedType = (format != null &&
format.requiresSparkRecordType())
Review Comment:
🤖 nit: `fixedType` doesn't communicate much intent — something like
`effectiveRecordType` or `resolvedRecordType` would make it clearer that this
is the format-aware override of the caller-supplied type.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -396,24 +399,51 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
}
//record is inserted or updated
- String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
- HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
+ String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator,
+ existingRecordContext, mergeResult, incomingBufferedRecord,
existingBufferedRecord, incomingRecordContext);
+ // When HoodieAvroRecordMerger creates a genuinely new BufferedRecord, it
encodes the schema into
+ // incomingRecordContext. Re-encode into existingRecordContext so
constructHoodieRecord (which uses
+ // existingRecordContext for the correct payload class) can resolve the
schema for SPARK records.
+ BufferedRecord<R> mergeResultForConstruct = mergeResult;
+ if (mergeResult != incomingBufferedRecord && mergeResult !=
existingBufferedRecord) {
+ HoodieSchema mergedSchema =
incomingRecordContext.getSchemaFromBufferRecord(mergeResult);
+ if (mergedSchema != null &&
existingRecordContext.getSchemaFromBufferRecord(mergeResult) == null) {
+ mergeResultForConstruct = BufferedRecords.fromEngineRecord(
+ mergeResult.getRecord(), mergeResult.getRecordKey(), mergedSchema,
+ existingRecordContext, Arrays.asList(orderingFieldNames),
mergeResult.getHoodieOperation());
+ }
+ }
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(mergeResultForConstruct,
partitionPath);
HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema,
writeSchemaWithMetaFields,
new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
properties);
return
Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
properties, Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema)));
}
Review Comment:
🤖 nit: `inferPartitionPath` has grown to 9 parameters — might be worth
considering a small value object or passing the incoming/existing
`BufferedRecord` pair as a context struct to reduce the signature length.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -47,24 +48,47 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val colsToCompare = "timestamp, _row_key, partition_path, rider, driver,
begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency,
_hoodie_is_deleted"
//params for core flow tests
- val params: List[String] = List(
-
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
-
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
-
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
-
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE"
- )
+ val params: List[String] = {
+ val allParams: List[String] = List(
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance",
+
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance",
+
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance"
Review Comment:
🤖 nit: the `if (HoodieSparkUtils.gteqSpark3_4) allParams else
allParams.filterNot(_.endsWith("|lance"))` pattern is copy-pasted verbatim for
`paramsForImmutable` too — could you pull it into a small helper like `def
withLanceIfSupported(ps: List[String]): List[String]` to keep the two call
sites DRY?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieLanceRealtimeInputFormat.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieLanceInputFormat;
+import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * HoodieRealtimeInputFormat for HUDI datasets which store data in Lance base
file format.
+ */
+@UseRecordReaderFromInputFormat
+@UseFileSplitsFromInputFormat
+public class HoodieLanceRealtimeInputFormat extends
HoodieMergeOnReadTableInputFormat {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLanceRealtimeInputFormat.class);
+
+ // NOTE: We're only using {@code HoodieLanceInputFormat} to compose {@code
RecordReader}
+ private final HoodieLanceInputFormat lanceInputFormat = new
HoodieLanceInputFormat();
+
+ @Override
+ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final
InputSplit split, final JobConf jobConf,
+ final Reporter reporter) throws IOException {
+ // Hive on Spark invokes multiple getRecordReaders from different threads
in the same spark task (and hence the
+ // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is
shared across all threads, is at the
+ // risk of experiencing race conditions. Hence, we synchronize on the
JobConf object here. There is negligible
+ // latency incurred here due to the synchronization since get record
reader is called once per spilt before the
+ // actual heavy lifting of reading the parquet files happen.
+ if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
Review Comment:
🤖 nit: the comment says "reading the parquet files" — looks like a
copy-paste artifact from the Parquet realtime format; probably should say
"Lance files" (or just "the files").
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java:
##########
@@ -70,6 +70,14 @@ public String getFileExtension() {
return extension;
}
+ /**
+ * Returns true if this file format requires the SPARK record type for
reading/writing.
+ * Lance only supports the Spark-native InternalRow representation, not Avro.
+ */
+ public boolean requiresSparkRecordType() {
Review Comment:
🤖 nit: the `requiresSparkRecordType() ? SPARK : <fallback>` ternary is
copy-pasted at least five times across this PR (HoodieIndexUtils ×3,
HoodieReadHandle, HoodieTable, HoodieFileWriterFactory) — have you considered
adding a helper like `resolveRecordType(HoodieRecordType fallback)` directly on
`HoodieFileFormat` to avoid the repetition?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
+public class HoodieLanceRecordReader implements RecordReader<NullWritable,
ArrayWritable> {
+
+ private long count = 0;
+ private final ArrayWritable valueObj;
+ private HoodieFileReader reader;
+ private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
+ private final HoodieSchema schema;
+
+ public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf
job) throws IOException {
+ FileSplit fileSplit = (FileSplit) split;
+ StoragePath path = convertToStoragePath(fileSplit.getPath());
+ StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+ HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+ reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path,
storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE,
Option.empty());
+
+ schema = reader.getSchema();
+ valueObj = new ArrayWritable(Writable.class, new
Writable[schema.getFields().size()]);
+ }
+
+ @Override
+ public boolean next(NullWritable key, ArrayWritable value) throws
IOException {
+ if (recordIterator == null) {
+ recordIterator = reader.getRecordIterator(schema);
+ }
+
+ if (!recordIterator.hasNext()) {
+ return false;
+ }
+
+ IndexedRecord record = recordIterator.next().getData();
+ ArrayWritable aWritable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record,
schema.toAvroSchema());
+ value.set(aWritable.get());
+ count++;
+ return true;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return null;
+ }
+
+ @Override
+ public ArrayWritable createValue() {
+ return valueObj;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
Review Comment:
🤖 nit: `// TODO Auto-generated method stub` looks like a leftover IDE
placeholder — could you either remove it or replace it with a note explaining
why `getPos()` always returns 0 for Lance?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceInputFormat.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * HoodieInputFormat for HUDI datasets which store data in Lance base file
format.
+ */
+@UseFileSplitsFromInputFormat
+public class HoodieLanceInputFormat extends HoodieCopyOnWriteTableInputFormat {
+
+ protected HoodieTimeline filterInstantsTimeline(HoodieTimeline timeline) {
+ return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
Review Comment:
🤖 nit: `filterInstantsTimeline` looks like it's overriding a parent method —
could you add `@Override` here so the compiler can catch any future signature
drift?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java:
##########
@@ -68,14 +70,16 @@ protected HoodieBaseFile getLatestBaseFile() {
}
protected HoodieFileReader createNewFileReader() throws IOException {
- return HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
- .getReaderFactory(this.config.getRecordMerger().getRecordType())
- .getFileReader(config, getLatestBaseFile().getStoragePath());
+ return createNewFileReader(getLatestBaseFile());
}
Review Comment:
🤖 nit: `ext` is a bit terse — the rest of the codebase (e.g.
`HoodieFileWriterFactory`) consistently uses `extension` for the same concept,
could you align here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]