nsivabalan commented on code in PR #13383:
URL: https://github.com/apache/hudi/pull/13383#discussion_r2122451477
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -137,16 +136,16 @@ public static String[] getExpressionIndexColumnNames() {
};
}
- public static List<Row> getRowsWithExpressionIndexMetadata(List<Row>
rowsForFilePath, String partition, String filePath, long fileSize) {
- return rowsForFilePath.stream().map(row -> {
- scala.collection.immutable.Seq<Object> indexMetadata =
JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition,
filePath, fileSize));
+ public static ClosableIterator<Row>
getRowsWithExpressionIndexMetadata(ClosableIterator<InternalRow>
rowsForFilePath, SparkRowSerDe sparkRowSerDe, String partition, String
filePath, long fileSize) {
+ return new CloseableMappingIterator<>(rowsForFilePath, row -> {
+ Seq<Object> indexMetadata =
JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition,
filePath, fileSize));
Row expressionIndexRow = Row.fromSeq(indexMetadata);
List<Row> rows = new ArrayList<>(2);
- rows.add(row);
+ rows.add(sparkRowSerDe.deserializeRow(row));
rows.add(expressionIndexRow);
- scala.collection.immutable.Seq<Row> rowSeq =
JavaScalaConverters.convertJavaListToScalaList(rows);
+ Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaList(rows);
Review Comment:
can we try something like
```
scala.collection.immutable.List.apply(val1, val2)
```
and simplify lines 141 to 146
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -139,17 +136,15 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants;
- protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
HoodieStorage storage, List<String> logFilePaths,
+ protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
List<String> logFilePaths,
boolean reverseReader, int bufferSize,
Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan, Option<String> partitionNameOverride,
Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
this.readerContext = readerContext;
- this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+ this.readerSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getRequiredSchema() : null;
this.latestInstantTime = readerContext.getLatestCommitTime();
- this.hoodieTableMetaClient = HoodieTableMetaClient.builder()
- .setStorage(storage)
- .setBasePath(readerContext.getTablePath()).build();
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
Review Comment:
So, w/ this change, we are going to transfer the meta client from driver to
all executors?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java:
##########
@@ -108,6 +122,9 @@ public void processNextDataRecord(BufferedRecord<T> record,
Serializable index)
@Override
public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
// no-op
+ if (emitDelete) {
+ currentInstantLogBlocks.add(deleteBlock);
Review Comment:
was this a bug before?
did we add tests for this
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -139,17 +136,15 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants;
- protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
HoodieStorage storage, List<String> logFilePaths,
+ protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
List<String> logFilePaths,
boolean reverseReader, int bufferSize,
Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan, Option<String> partitionNameOverride,
Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
this.readerContext = readerContext;
- this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+ this.readerSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getRequiredSchema() : null;
this.latestInstantTime = readerContext.getLatestCommitTime();
- this.hoodieTableMetaClient = HoodieTableMetaClient.builder()
- .setStorage(storage)
- .setBasePath(readerContext.getTablePath()).build();
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
Review Comment:
may not be required as part of this patch.
but I was chasing the necessity of this HTMetaClient.
we use the commits timeline to be used in below code snippet
```
if (logBlock.getBlockType() != COMMAND_BLOCK) {
if (!allowInflightInstants
&& (inflightInstantsTimeline.containsInstant(instantTime)) ||
!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)) {
// hit an uncommitted block possibly from a failed write, move
to the next one and skip processing this one
continue;
}
```
all we need here are
```
1. start entry of active timeline. String
2. Set<String> inflightInstantTimes
3. Set<String> completedInstantTimes
```
and we could send these from driver to executor rather than sending over
entire HoodieTableMetaClient.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java:
##########
@@ -37,13 +37,15 @@
import java.io.Serializable;
import java.util.ArrayDeque;
+import java.util.Arrays;
import java.util.Deque;
import java.util.Iterator;
public class UnmergedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T>
{
private final Deque<HoodieLogBlock> currentInstantLogBlocks;
private ClosableIterator<T> recordIterator;
+ private boolean isDeleteBlock;
Review Comment:
minor.
can we rename to `isCurrentBlockDeleteBlock` something on those lines.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -170,6 +171,9 @@ public Comparable convertValueToEngineType(Comparable
value) {
@Override
public InternalRow getDeleteRow(InternalRow record, String recordKey) {
- throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
+ if (record != null) {
+ return record;
+ }
+ return new HoodieInternalRow(null, null, UTF8String.fromString(recordKey),
null, null, null, false);
Review Comment:
we should also be able to pass in partition path as an arg and set it in the
record right.
Or the partition meta field is pretty much never used only(within file group
reader) w/ all other recent changes we landed ?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1701,31 +1677,38 @@ private static
List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetada
* Read column range metadata from log file.
*/
@VisibleForTesting
- public static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient
datasetMetaClient,
+ public static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath, String partitionPath,
+
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex, Option<Schema> writerSchemaOpt,
int maxBufferSize) throws IOException {
if (writerSchemaOpt.isPresent()) {
List<Pair<String, Schema.Field>> fieldsToIndex =
columnsToIndex.stream().map(fieldName ->
HoodieAvroUtils.getSchemaForField(writerSchemaOpt.get(), fieldName))
.collect(Collectors.toList());
// read log file records without merging
- List<HoodieRecord> records = new ArrayList<>();
- HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(datasetMetaClient.getStorage())
- .withBasePath(datasetMetaClient.getBasePath())
- .withLogFilePaths(Collections.singletonList(filePath))
- .withBufferSize(maxBufferSize)
-
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
- .withReaderSchema(writerSchemaOpt.get())
- .withTableMetaClient(datasetMetaClient)
- .withLogRecordScannerCallback(records::add)
+ HoodieLogFile logFile = new HoodieLogFile(filePath);
+ FileSlice fileSlice = new FileSlice(partitionPath,
logFile.getDeltaCommitTime(), logFile.getFileId());
+ fileSlice.addLogFile(logFile);
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(MAX_MEMORY_FOR_MERGE.key(),
Long.toString(maxBufferSize));
+ // Currently only avro is fully supported for extracting column ranges
(see HUDI-8585)
+ HoodieReaderContext readerContext = new
HoodieAvroReaderContext(datasetMetaClient.getStorageConf(),
datasetMetaClient.getTableConfig(), Option.empty(), Option.empty());
+ HoodieFileGroupReader fileGroupReader =
HoodieFileGroupReader.newBuilder()
+ .withReaderContext(readerContext)
Review Comment:
can you help me understand, where exactly we set the props so that this will
end up using UnmergedFileGroupRecordBuffer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]