vinothchandar commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r773607983
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
##########
@@ -74,40 +75,49 @@ public static Path getInlineFilePath(Path outerPath, String
origScheme, long inL
* @return Outer file Path from the InLineFS Path
*/
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
- final String scheme = inlineFSPath.getParent().getName();
+ assertInlineFSPath(inlineFSPath);
+
+ final String baseFileScheme = inlineFSPath.getParent().getName();
Review comment:
Can we stick to inner/outer terminology? given "base file" a very
different meaning in Hudi
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +132,97 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType
logDataBlockFormat, Lis
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records ==
null) {
- // read block lazily
- createRecordsFromContentBytes();
+ Option<byte[]> content = getContent();
+
+ checkState(content.isPresent() || records != null, "Block is in invalid
state");
+
+ if (content.isPresent()) {
+ return content.get();
}
- return serializeRecords();
+ return serializeRecords(records);
}
- public abstract HoodieLogBlockType getBlockType();
+ protected static Schema getWriterSchema(Map<HeaderMetadataType, String>
logBlockHeader) {
+ return new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+ }
- public List<IndexedRecord> getRecords() {
+ /**
+ * Returns all the records contained w/in this block
+ */
+ public final List<IndexedRecord> getRecords() {
if (records == null) {
try {
// in case records are absent, read content lazily and then convert to
IndexedRecords
- createRecordsFromContentBytes();
+ records = readRecordsFromBlockPayload();
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to
records", io);
}
}
return records;
}
+ public Schema getSchema() {
+ return readerSchema;
+ }
+
/**
* Batch get of keys of interest. Implementation can choose to either do
full scan and return matched entries or
* do a seek based parsing and return matched entries.
+ *
* @param keys keys of interest.
* @return List of IndexedRecords for the keys of interest.
- * @throws IOException
+ * @throws IOException in case of failures encountered when reading/parsing
records
*/
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- throw new UnsupportedOperationException("On demand batch get based on
interested keys not supported");
- }
+ public final List<IndexedRecord> getRecords(List<String> keys) throws
IOException {
+ boolean fullScan = keys.isEmpty();
+ if (enablePointLookups && !fullScan) {
+ return lookupRecords(keys);
+ }
- public Schema getSchema() {
- // if getSchema was invoked before converting byte [] to records
- if (records == null) {
- getRecords();
+ // Otherwise, we fetch all the records and filter out all the records, but
the
+ // ones requested
+ List<IndexedRecord> allRecords = getRecords();
+ if (fullScan) {
+ return allRecords;
}
- return schema;
+
+ HashSet<String> keySet = new HashSet<>(keys);
+ return allRecords.stream()
+ .filter(record -> keySet.contains(getRecordKey(record)))
+ .collect(Collectors.toList());
}
- protected void createRecordsFromContentBytes() throws IOException {
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
- deserializeRecords();
+ try {
+ return deserializeRecords(getContent().get());
+ } finally {
+ // Free up content to be GC'd by deflating the block
+ deflate();
Review comment:
note to self ; this should not affect the returned value, since that's a
new list
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -125,6 +126,11 @@
.withAlternatives("hoodie.table.rt.file.format")
.withDocumentation("Log format used for the delta logs.");
+ public static final ConfigProperty<String> LOG_BLOCK_TYPE = ConfigProperty
Review comment:
I would love for this to be dynamically figured out based on the log
block header? We anyway read them out first before processing the blocks
themselves. This way a streaming writer can be ingesting avro blocks, while a
background delete/backfill can be adding columnar data into the log blocks.
Let's remove the table config?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,59 +18,79 @@
package org.apache.hudi.common.table.log.block;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
-
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* DataBlock contains a list of records serialized using formats compatible
with the base file format.
* For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
* The Datablock contains:
* 1. Data Block version
* 2. Total number of records in the block
* 3. Actual serialized content of the records
*/
public abstract class HoodieDataBlock extends HoodieLogBlock {
- protected List<IndexedRecord> records;
- protected Schema schema;
- protected String keyField;
+ // TODO rebase records/content to leverage Either to warrant
+ // that they are mutex (used by read/write flows respectively)
+ private List<IndexedRecord> records;
- public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
- @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Option<byte[]> content,
- FSDataInputStream inputStream, boolean readBlockLazily) {
- super(logBlockHeader, logBlockFooter, blockContentLocation, content,
inputStream, readBlockLazily);
- this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
- }
+ /**
+ * Dot-path notation reference to the key field w/in the record's schema
+ */
+ private final String keyFieldRef;
Review comment:
rename: keyFieldName to be consistent
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -224,6 +226,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content,
Schema readerSchema)
return new HoodieAvroDataBlock(records, readerSchema);
}
+ private static byte[] compress(String text) {
Review comment:
Can we move this to StringUtils
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -157,17 +161,17 @@ protected void deserializeRecords() throws IOException {
// 3. Read the content
for (int i = 0; i < totalRecords; i++) {
int recordLength = dis.readInt();
- BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(getContent().get(),
dis.getNumberOfBytesRead(),
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(content,
dis.getNumberOfBytesRead(),
recordLength, decoderCache.get());
decoderCache.set(decoder);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
dis.skipBytes(recordLength);
}
+
dis.close();
- this.records = records;
- // Free up content to be GC'd, deflate
- deflate();
Review comment:
IIUC since we just pass records in now and not hang onto it, we are
good. This is a nice cleanup!
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -18,59 +18,79 @@
package org.apache.hudi.common.table.log.block;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
-
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* DataBlock contains a list of records serialized using formats compatible
with the base file format.
* For each base file format there is a corresponding DataBlock format.
- *
+ * <p>
* The Datablock contains:
* 1. Data Block version
* 2. Total number of records in the block
* 3. Actual serialized content of the records
*/
public abstract class HoodieDataBlock extends HoodieLogBlock {
- protected List<IndexedRecord> records;
- protected Schema schema;
- protected String keyField;
+ // TODO rebase records/content to leverage Either to warrant
+ // that they are mutex (used by read/write flows respectively)
+ private List<IndexedRecord> records;
- public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
- @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Option<byte[]> content,
- FSDataInputStream inputStream, boolean readBlockLazily) {
- super(logBlockHeader, logBlockFooter, blockContentLocation, content,
inputStream, readBlockLazily);
- this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
- }
+ /**
+ * Dot-path notation reference to the key field w/in the record's schema
+ */
+ private final String keyFieldRef;
+
+ private final boolean enablePointLookups;
- public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull
Map<HeaderMetadataType, String> header,
- @Nonnull Map<HeaderMetadataType, String> footer,
String keyField) {
- this(header, footer, Option.empty(), Option.empty(), null, false);
+ protected final Schema readerSchema;
+
+ /**
+ * NOTE: This ctor is used on the write-path (ie when records ought to be
written into the log)
+ */
+ public HoodieDataBlock(List<IndexedRecord> records,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyFieldRef) {
+ super(header, footer, Option.empty(), Option.empty(), null, false);
this.records = records;
- this.schema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- this.keyField = keyField;
+ this.keyFieldRef = keyFieldRef;
+ // If no reader-schema has been provided assume writer-schema as one
+ this.readerSchema = getWriterSchema(super.getLogBlockHeader());
+ this.enablePointLookups = false;
}
- protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream
inputStream, boolean readBlockLazily,
- Option<HoodieLogBlockContentLocation>
blockContentLocation, Schema readerSchema,
- @Nonnull Map<HeaderMetadataType, String> headers,
@Nonnull Map<HeaderMetadataType,
- String> footer, String keyField) {
- this(headers, footer, blockContentLocation, content, inputStream,
readBlockLazily);
- this.schema = readerSchema;
- this.keyField = keyField;
+ /**
+ * NOTE: This ctor is used on the write-path (ie when records ought to be
written into the log)
+ */
+ protected HoodieDataBlock(Option<byte[]> content,
+ FSDataInputStream inputStream,
+ boolean readBlockLazily,
+ Option<HoodieLogBlockContentLocation>
blockContentLocation,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> headers,
+ Map<HeaderMetadataType, String> footer,
+ String keyFieldRef,
+ boolean enablePointLookups) {
+ super(headers, footer, blockContentLocation, content, inputStream,
readBlockLazily);
+ this.records = null;
Review comment:
Can we use `Option` instead of `null`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
##########
@@ -74,40 +75,49 @@ public static Path getInlineFilePath(Path outerPath, String
origScheme, long inL
* @return Outer file Path from the InLineFS Path
*/
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
- final String scheme = inlineFSPath.getParent().getName();
+ assertInlineFSPath(inlineFSPath);
+
+ final String baseFileScheme = inlineFSPath.getParent().getName();
final Path basePath = inlineFSPath.getParent().getParent();
-
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
- "Invalid InLineFSPath: " + inlineFSPath);
+ checkArgument(
+ basePath.toString().contains(SCHEME_SEPARATOR),
Review comment:
please unwrap this line (and similar everywhere) to keep things
consistent with how it is?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -36,50 +29,67 @@
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.common.fs.SizeAwareDataInputStream;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
-import javax.annotation.Nonnull;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* HoodieAvroDataBlock contains a list of records serialized using Avro. It is
used with the Parquet base file format.
*/
public class HoodieAvroDataBlock extends HoodieDataBlock {
- private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
- private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
- public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
- @Nonnull Map<HeaderMetadataType, String>
logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation>
blockContentLocation, @Nonnull Option<byte[]> content,
- FSDataInputStream inputStream, boolean
readBlockLazily) {
- super(logBlockHeader, logBlockFooter, blockContentLocation, content,
inputStream, readBlockLazily);
- }
-
- public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream
inputStream, Option<byte[]> content,
- boolean readBlockLazily, long position, long
blockSize, long blockEndpos, Schema readerSchema,
- Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, String keyField) {
+ private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
+ private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
+
+ public HoodieAvroDataBlock(
+ HoodieLogFile logFile,
+ FSDataInputStream inputStream,
+ Option<byte[]> content,
+ boolean readBlockLazily,
+ long position, long blockSize, long blockEndPos,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyField) {
super(content, inputStream, readBlockLazily,
- Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndpos)), readerSchema, header,
- footer, keyField);
+ Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndPos)), readerSchema, header,
+ footer, keyField, false);
}
- public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull
Map<HeaderMetadataType,
- String> header, String keyField) {
+ public HoodieAvroDataBlock(
Review comment:
Can we please revert this
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +132,97 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType
logDataBlockFormat, Lis
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records ==
null) {
- // read block lazily
- createRecordsFromContentBytes();
+ Option<byte[]> content = getContent();
+
+ checkState(content.isPresent() || records != null, "Block is in invalid
state");
+
+ if (content.isPresent()) {
+ return content.get();
}
- return serializeRecords();
+ return serializeRecords(records);
}
- public abstract HoodieLogBlockType getBlockType();
+ protected static Schema getWriterSchema(Map<HeaderMetadataType, String>
logBlockHeader) {
+ return new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+ }
- public List<IndexedRecord> getRecords() {
+ /**
+ * Returns all the records contained w/in this block
+ */
+ public final List<IndexedRecord> getRecords() {
if (records == null) {
try {
// in case records are absent, read content lazily and then convert to
IndexedRecords
- createRecordsFromContentBytes();
+ records = readRecordsFromBlockPayload();
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to
records", io);
}
}
return records;
}
+ public Schema getSchema() {
+ return readerSchema;
+ }
+
/**
* Batch get of keys of interest. Implementation can choose to either do
full scan and return matched entries or
* do a seek based parsing and return matched entries.
+ *
* @param keys keys of interest.
* @return List of IndexedRecords for the keys of interest.
- * @throws IOException
+ * @throws IOException in case of failures encountered when reading/parsing
records
*/
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- throw new UnsupportedOperationException("On demand batch get based on
interested keys not supported");
- }
+ public final List<IndexedRecord> getRecords(List<String> keys) throws
IOException {
+ boolean fullScan = keys.isEmpty();
+ if (enablePointLookups && !fullScan) {
+ return lookupRecords(keys);
+ }
- public Schema getSchema() {
- // if getSchema was invoked before converting byte [] to records
- if (records == null) {
- getRecords();
+ // Otherwise, we fetch all the records and filter out all the records, but
the
+ // ones requested
+ List<IndexedRecord> allRecords = getRecords();
+ if (fullScan) {
+ return allRecords;
}
- return schema;
+
+ HashSet<String> keySet = new HashSet<>(keys);
+ return allRecords.stream()
+ .filter(record -> keySet.contains(getRecordKey(record)))
+ .collect(Collectors.toList());
}
- protected void createRecordsFromContentBytes() throws IOException {
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
- deserializeRecords();
+ try {
+ return deserializeRecords(getContent().get());
+ } finally {
+ // Free up content to be GC'd by deflating the block
+ deflate();
+ }
+ }
+
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
+ throw new UnsupportedOperationException(
+ String.format("Point-wise records lookups are not supported by this
Data block type (%s)", getBlockType())
Review comment:
nit: point lookups are not ...
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -93,43 +105,48 @@ public HoodieLogBlockType getBlockType() {
}
@Override
- protected byte[] serializeRecords() throws IOException {
- HFileContext context = new
HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm)
+ protected byte[] serializeRecords(List<IndexedRecord> records) throws
IOException {
+ HFileContext context = new HFileContextBuilder()
+ .withBlockSize(DEFAULT_BLOCK_SIZE)
+ // TODO fetch value from the config
+ .withCompression(DEFAULT_COMPRESSION_ALGO)
Review comment:
can we fetch from the HFile config in this PR itself?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -36,50 +29,63 @@
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.SizeAwareDataInputStream;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nonnull;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* HoodieAvroDataBlock contains a list of records serialized using Avro. It is
used with the Parquet base file format.
*/
public class HoodieAvroDataBlock extends HoodieDataBlock {
- private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
- private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
- public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
- @Nonnull Map<HeaderMetadataType, String>
logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation>
blockContentLocation, @Nonnull Option<byte[]> content,
- FSDataInputStream inputStream, boolean
readBlockLazily) {
- super(logBlockHeader, logBlockFooter, blockContentLocation, content,
inputStream, readBlockLazily);
- }
-
- public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream
inputStream, Option<byte[]> content,
- boolean readBlockLazily, long position, long
blockSize, long blockEndpos, Schema readerSchema,
- Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, String keyField) {
+ private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
+ private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
+
+ public HoodieAvroDataBlock(
+ HoodieLogFile logFile,
Review comment:
Can we please stick to how the current methods are formatted along the
arguments? wrapped and indented after the method's `(`.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +132,97 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType
logDataBlockFormat, Lis
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records ==
null) {
- // read block lazily
- createRecordsFromContentBytes();
+ Option<byte[]> content = getContent();
+
+ checkState(content.isPresent() || records != null, "Block is in invalid
state");
+
+ if (content.isPresent()) {
+ return content.get();
}
- return serializeRecords();
+ return serializeRecords(records);
}
- public abstract HoodieLogBlockType getBlockType();
+ protected static Schema getWriterSchema(Map<HeaderMetadataType, String>
logBlockHeader) {
+ return new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
+ }
- public List<IndexedRecord> getRecords() {
+ /**
+ * Returns all the records contained w/in this block
+ */
+ public final List<IndexedRecord> getRecords() {
if (records == null) {
try {
// in case records are absent, read content lazily and then convert to
IndexedRecords
- createRecordsFromContentBytes();
+ records = readRecordsFromBlockPayload();
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to
records", io);
}
}
return records;
}
+ public Schema getSchema() {
+ return readerSchema;
+ }
+
/**
* Batch get of keys of interest. Implementation can choose to either do
full scan and return matched entries or
* do a seek based parsing and return matched entries.
+ *
* @param keys keys of interest.
* @return List of IndexedRecords for the keys of interest.
- * @throws IOException
+ * @throws IOException in case of failures encountered when reading/parsing
records
*/
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- throw new UnsupportedOperationException("On demand batch get based on
interested keys not supported");
- }
+ public final List<IndexedRecord> getRecords(List<String> keys) throws
IOException {
+ boolean fullScan = keys.isEmpty();
+ if (enablePointLookups && !fullScan) {
+ return lookupRecords(keys);
+ }
- public Schema getSchema() {
- // if getSchema was invoked before converting byte [] to records
- if (records == null) {
- getRecords();
+ // Otherwise, we fetch all the records and filter out all the records, but
the
+ // ones requested
+ List<IndexedRecord> allRecords = getRecords();
+ if (fullScan) {
+ return allRecords;
}
- return schema;
+
+ HashSet<String> keySet = new HashSet<>(keys);
+ return allRecords.stream()
+ .filter(record -> keySet.contains(getRecordKey(record)))
+ .collect(Collectors.toList());
}
- protected void createRecordsFromContentBytes() throws IOException {
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
- deserializeRecords();
+ try {
+ return deserializeRecords(getContent().get());
+ } finally {
+ // Free up content to be GC'd by deflating the block
+ deflate();
+ }
+ }
+
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
+ throw new UnsupportedOperationException(
+ String.format("Point-wise records lookups are not supported by this
Data block type (%s)", getBlockType())
+ );
}
- protected abstract byte[] serializeRecords() throws IOException;
+ protected abstract byte[] serializeRecords(List<IndexedRecord> records)
throws IOException;
+
+ protected abstract List<IndexedRecord> deserializeRecords(byte[] content)
throws IOException;
- protected abstract void deserializeRecords() throws IOException;
+ public abstract HoodieLogBlockType getBlockType();
+
+ protected String getRecordKey(IndexedRecord record) {
Review comment:
wondering if there is a reusable helper for this. how would this work
for different key generators ? we seem to assume there is a single key field -
which is true always when hudi metadata is enabled. something to think about.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]