alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778468264
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType
logDataBlockFormat, Lis
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records ==
null) {
- // read block lazily
- createRecordsFromContentBytes();
+ Option<byte[]> content = getContent();
+
+ checkState(content.isPresent() || records != null, "Block is in invalid
state");
+
+ if (content.isPresent()) {
+ return content.get();
}
- return serializeRecords();
Review comment:
Correct. This could should not be reachable (this method is only used on
the write path)
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+ public HoodieParquetDataBlock(HoodieLogFile logFile,
+ FSDataInputStream inputStream,
+ Option<byte[]> content,
+ boolean readBlockLazily,
+ long position, long blockSize, long
blockEndPos,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyField) {
+ super(content,
+ inputStream,
+ readBlockLazily,
+ Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndPos)),
+ readerSchema,
+ header,
+ footer,
+ keyField,
+ false);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull String keyField
+ ) {
+ super(records, header, new HashMap<>(), keyField);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header
+ ) {
+ super(records, header, new HashMap<>(),
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ }
+
+ @Override
+ public HoodieLogBlockType getBlockType() {
+ return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+ }
+
+ @Override
+ protected byte[] serializeRecords(List<IndexedRecord> records) throws
IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(writerSchema), writerSchema,
Option.empty());
+
+ HoodieAvroParquetConfig avroParquetConfig =
+ new HoodieAvroParquetConfig(
+ writeSupport,
+ // TODO fetch compression codec from the config
+ CompressionCodecName.GZIP,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE,
+ 1024 * 1024 * 1024,
+ new Configuration(),
+
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
+ try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new
HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
+ for (IndexedRecord record : records) {
+ String recordKey = getRecordKey(record);
+ parquetWriter.writeAvro(recordKey, record);
+ }
+ outputStream.flush();
+ }
+ }
+
+ return baos.toByteArray();
+ }
+
+ public static Iterator<IndexedRecord>
getProjectedParquetRecordsIterator(Configuration conf,
+
Schema readerSchema,
+
InputFile inputFile) throws IOException {
+ AvroReadSupport.setAvroReadSchema(conf, readerSchema);
+ AvroReadSupport.setRequestedProjection(conf, readerSchema);
+ ParquetReader<IndexedRecord> reader =
+
AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
+ return new ParquetReaderIterator<>(reader);
+ }
+
+ /**
+ * NOTE: We're overriding the whole reading sequence to make sure we
properly respect
+ * the requested Reader's schema and only fetch the columns that have
been explicitly
+ * requested by the caller (providing projected Reader's schema)
+ */
+ @Override
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
+ Configuration inlineConf = new Configuration();
+ inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
+ Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ ArrayList<IndexedRecord> records = new ArrayList<>();
+
+ getProjectedParquetRecordsIterator(
+ inlineConf,
Review comment:
Makes sense
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,59 +18,79 @@
package org.apache.hudi.common.table.log.block;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
-
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* DataBlock contains a list of records serialized using formats compatible
with the base file format.
* For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
* The Datablock contains:
* 1. Data Block version
* 2. Total number of records in the block
* 3. Actual serialized content of the records
*/
public abstract class HoodieDataBlock extends HoodieLogBlock {
- protected List<IndexedRecord> records;
- protected Schema schema;
- protected String keyField;
+ // TODO rebase records/content to leverage Either to warrant
+ // that they are mutex (used by read/write flows respectively)
+ private List<IndexedRecord> records;
- public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
- @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Option<byte[]> content,
- FSDataInputStream inputStream, boolean readBlockLazily) {
- super(logBlockHeader, logBlockFooter, blockContentLocation, content,
inputStream, readBlockLazily);
- this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
- }
+ /**
+ * Dot-path notation reference to the key field w/in the record's schema
+ */
+ private final String keyFieldRef;
Review comment:
`keyFieldName` is confusing -- it's not really a field name but rather
FQ ref w/in the schema
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
public enum HoodieLogBlockType {
- COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK,
HFILE_DATA_BLOCK
+ COMMAND_BLOCK(":command"),
+ DELETE_BLOCK(":delete"),
+ CORRUPT_BLOCK(":corrupted"),
Review comment:
Corrupt is a valid adjective, but it's rarely used in a context of
"broken" things as far as i can tell (more often used to describe lack of moral
integrity like "politician is corrupt")
https://english.stackexchange.com/questions/405182/corrupt-or-corrupted
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -224,6 +226,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content,
Schema readerSchema)
return new HoodieAvroDataBlock(records, readerSchema);
}
+ private static byte[] compress(String text) {
Review comment:
This is only used in `HoodieAvroDataBlock` w/in the method that is
deprecated. Do we want to hang on to it?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.hudi.parquet.io.ByteBufferBackedInputFile;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+ public HoodieParquetDataBlock(
+ HoodieLogFile logFile,
+ FSDataInputStream inputStream,
+ Option<byte[]> content,
+ boolean readBlockLazily, long position, long blockSize, long blockEndpos,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyField
+ ) {
+ super(
+ content,
+ inputStream,
+ readBlockLazily,
+ Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndpos)),
+ readerSchema,
+ header,
+ footer,
+ keyField,
+ false);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull String keyField
+ ) {
+ super(records, header, new HashMap<>(), keyField);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header
+ ) {
+ super(records, header, new HashMap<>(),
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ }
+
+ @Override
+ public HoodieLogBlockType getBlockType() {
+ return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+ }
+
+ @Override
+ protected byte[] serializeRecords(List<IndexedRecord> records) throws
IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(writerSchema), writerSchema,
Option.empty());
+
+ HoodieAvroParquetConfig avroParquetConfig =
+ new HoodieAvroParquetConfig(
+ writeSupport,
+ // TODO fetch compression codec from the config
+ CompressionCodecName.GZIP,
Review comment:
It's a fair call given that this is a new code.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -125,6 +126,11 @@
.withAlternatives("hoodie.table.rt.file.format")
.withDocumentation("Log format used for the delta logs.");
+ public static final ConfigProperty<String> LOG_BLOCK_TYPE = ConfigProperty
Review comment:
How would this work? How's block-type going to be determined on the
write path?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -288,28 +288,14 @@ public synchronized void close() {
}
}
- static class SeekableByteArrayInputStream extends ByteArrayInputStream
implements Seekable, PositionedReadable {
+ static class SeekableByteArrayInputStream extends
ByteBufferBackedInputStream implements Seekable, PositionedReadable {
Review comment:
It's not changing, just abstracting common functionality required
elsewhere
##########
File path:
hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * Implementation of the {@link OutputFile} backed by {@link
java.io.OutputStream}
+ */
+public class OutputStreamBackedOutputFile implements OutputFile {
Review comment:
Correct. These are needed to be able to write Parquet into `OutputStream`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+// TODO unify w/ HoodieParquetWriter
+public class HoodieParquetStreamWriter<R extends IndexedRecord> implements
AutoCloseable {
Review comment:
`ParquetWriter` ctors that are needed here are package-private and hence
i can't extend `HoodieParquetWriter` to use them -- hence i had to wrap around
`ParquetWriter` in here to be able to write into `OutputStream`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
public enum HoodieLogBlockType {
- COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK,
HFILE_DATA_BLOCK
+ COMMAND_BLOCK(":command"),
Review comment:
To distinguish b/w format-based blocks and action-based blocks
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/testutils/HadoopMapRedUtils.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.util.Option;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HadoopMapRedUtils {
Review comment:
Fair call, but this is the only way Parquet reader provides insights
into how much was actually read. If we don't want to add those we will have to
forego these assertions.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +132,97 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType
logDataBlockFormat, Lis
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records ==
null) {
- // read block lazily
- createRecordsFromContentBytes();
+ Option<byte[]> content = getContent();
+
+ checkState(content.isPresent() || records != null, "Block is in invalid
state");
+
+ if (content.isPresent()) {
+ return content.get();
}
- return serializeRecords();
+ return serializeRecords(records);
}
- public abstract HoodieLogBlockType getBlockType();
+ protected static Schema getWriterSchema(Map<HeaderMetadataType, String>
logBlockHeader) {
+ return new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+ }
- public List<IndexedRecord> getRecords() {
+ /**
+ * Returns all the records contained w/in this block
+ */
+ public final List<IndexedRecord> getRecords() {
if (records == null) {
try {
// in case records are absent, read content lazily and then convert to
IndexedRecords
- createRecordsFromContentBytes();
+ records = readRecordsFromBlockPayload();
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to
records", io);
}
}
return records;
}
+ public Schema getSchema() {
+ return readerSchema;
+ }
+
/**
* Batch get of keys of interest. Implementation can choose to either do
full scan and return matched entries or
* do a seek based parsing and return matched entries.
+ *
* @param keys keys of interest.
* @return List of IndexedRecords for the keys of interest.
- * @throws IOException
+ * @throws IOException in case of failures encountered when reading/parsing
records
*/
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- throw new UnsupportedOperationException("On demand batch get based on
interested keys not supported");
- }
+ public final List<IndexedRecord> getRecords(List<String> keys) throws
IOException {
+ boolean fullScan = keys.isEmpty();
+ if (enablePointLookups && !fullScan) {
+ return lookupRecords(keys);
+ }
- public Schema getSchema() {
- // if getSchema was invoked before converting byte [] to records
- if (records == null) {
- getRecords();
+ // Otherwise, we fetch all the records and filter out all the records, but
the
+ // ones requested
+ List<IndexedRecord> allRecords = getRecords();
+ if (fullScan) {
+ return allRecords;
}
- return schema;
+
+ HashSet<String> keySet = new HashSet<>(keys);
+ return allRecords.stream()
+ .filter(record -> keySet.contains(getRecordKey(record)))
+ .collect(Collectors.toList());
}
- protected void createRecordsFromContentBytes() throws IOException {
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
- deserializeRecords();
+ try {
+ return deserializeRecords(getContent().get());
+ } finally {
+ // Free up content to be GC'd by deflating the block
+ deflate();
+ }
+ }
+
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
+ throw new UnsupportedOperationException(
+ String.format("Point-wise records lookups are not supported by this
Data block type (%s)", getBlockType())
+ );
}
- protected abstract byte[] serializeRecords() throws IOException;
+ protected abstract byte[] serializeRecords(List<IndexedRecord> records)
throws IOException;
+
+ protected abstract List<IndexedRecord> deserializeRecords(byte[] content)
throws IOException;
- protected abstract void deserializeRecords() throws IOException;
+ public abstract HoodieLogBlockType getBlockType();
+
+ protected String getRecordKey(IndexedRecord record) {
Review comment:
This is just an abstraction of the existing behavior: currently blocks
don't support virtual keys, and if we want to do so i'd rather do this in a
standalone PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]