alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778523226
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableInlineReading,
+ boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
String keyField) throws IOException {
- FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
- this.logFile = logFile;
- this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+ // NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
+ // is prefixed with an appropriate scheme given that we're not
propagating the FS
+ // further
+ this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));
Review comment:
👍
##########
File path:
hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Implementation of {@link InputFile} backed by {@code byte[]} buffer
+ */
+public class ByteBufferBackedInputFile implements InputFile {
+ private final byte[] buffer;
+ private final int offset;
+ private final int length;
+
+ public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public ByteBufferBackedInputFile(byte[] buffer) {
+ this(buffer, 0, buffer.length);
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
Review comment:
These classes are a simple wrappers (around `ByteBufferBackednputStream`
which bears UT) and are tested by FTs (they're used in DataBlocks). Do we still
want to test them w/ standalone UTs?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -72,7 +76,7 @@
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
- private boolean enableInlineReading;
+ private boolean enableRecordLookups;
Review comment:
Not sure i understand your point fully -- this flag in particular is
used to gate whether we allow point lookups for records when
`getRecords(<keys>)` method of the Block is used
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
}
@Override
- protected void createRecordsFromContentBytes() throws IOException {
- if (enableInlineReading) {
- getRecords(Collections.emptyList());
- } else {
- super.createRecordsFromContentBytes();
- }
- }
-
- @Override
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- readWithInlineFS(keys);
- return records;
- }
-
- private void readWithInlineFS(List<String> keys) throws IOException {
- boolean enableFullScan = keys.isEmpty();
- // Get schema from the header
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- // If readerSchema was not present, use writerSchema
- if (schema == null) {
- schema = writerSchema;
- }
- Configuration conf = new Configuration();
- CacheConfig cacheConf = new CacheConfig(conf);
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
Configuration inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
Path inlinePath = InLineFSUtils.getInlineFilePath(
- getBlockContentLocation().get().getLogFile().getPath(),
-
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
- getBlockContentLocation().get().getContentPositionInLogFile(),
- getBlockContentLocation().get().getBlockSize());
- if (!enableFullScan) {
- // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
- Collections.sort(keys);
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
+ Collections.sort(keys);
+
+ try (HoodieHFileReader<IndexedRecord> reader =
+ new HoodieHFileReader<>(inlineConf, inlinePath, new
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+ // Get writer's schema from the header
+ List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys,
readerSchema);
+ return
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
}
- HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath,
cacheConf, inlinePath.getFileSystem(inlineConf));
- List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
- reader.readRecords(keys, schema);
- reader.close();
- this.records = logRecords.stream().map(t ->
t.getSecond()).collect(Collectors.toList());
}
@Override
- protected void deserializeRecords() throws IOException {
+ protected List<IndexedRecord> deserializeRecords(byte[] content) throws
IOException {
+ checkState(readerSchema != null, "Reader's schema has to be non-null");
Review comment:
Correct. Now it'd never be null
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]