vinothchandar commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r774314975
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableInlineReading,
+ boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
String keyField) throws IOException {
- FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
- this.logFile = logFile;
- this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+ // NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
+ // is prefixed with an appropriate scheme given that we're not
propagating the FS
+ // further
+ this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));
Review comment:
let'sgetrid of the extra fs. getFileStatus call. Its an extra Rpc to
S3/HDFS
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -481,4 +442,59 @@ public long moveToPrev() throws IOException {
public void remove() {
throw new UnsupportedOperationException("Remove not supported for
HoodieLogFileReader");
}
+
+ /**
+ * Fetch the right {@link FSDataInputStream} to be used by wrapping with
required input streams.
+ * @param fs instance of {@link FileSystem} in use.
+ * @param bufferSize buffer size to be used.
+ * @return the right {@link FSDataInputStream} as required.
+ */
+ private static FSDataInputStream getFSDataInputStream(FileSystem fs,
Review comment:
has any code Changed here?
Please refrain from relocating code in the same PR that also
changes/rewrites actual logic. If you must, then annotate the PR with comments
letting the reviewer know what's going on .
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -288,28 +288,14 @@ public synchronized void close() {
}
}
- static class SeekableByteArrayInputStream extends ByteArrayInputStream
implements Seekable, PositionedReadable {
+ static class SeekableByteArrayInputStream extends
ByteBufferBackedInputStream implements Seekable, PositionedReadable {
Review comment:
something that changes the reading for HFile in general, should go on
its own PR?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -72,7 +76,7 @@
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
- private boolean enableInlineReading;
+ private boolean enableRecordLookups;
Review comment:
Wondering if there are actually two separate concepts here.
inline reading - Where are use a parquet reader or hfile reader to read
block content, iinstead of reading it out as a byte[]
record lookups - Whether we scan and filter the keys of interest or push the
lookups to the reader/fileformat.
I feel the first one should be what the log reader should care about. We
should push the record lookups into the file reader?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableInlineReading,
+ boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
String keyField) throws IOException {
- FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
- this.logFile = logFile;
- this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+ // NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
+ // is prefixed with an appropriate scheme given that we're not
propagating the FS
+ // further
+ this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));
Review comment:
In general, there should be no direct list or status fetches here. We
removed all of them with O.9.0
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException {
HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
// 3. Read the block type for a log block
- if (nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION) {
- type = inputStream.readInt();
-
- ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length,
"Invalid block byte type found " + type);
- blockType = HoodieLogBlockType.values()[type];
- }
+ HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
// 4. Read the header for a log block, if present
- if (nextBlockVersion.hasHeader()) {
- header = HoodieLogBlock.getLogMetadata(inputStream);
- }
- int contentLength = blocksize;
+ Map<HeaderMetadataType, String> header =
+ nextBlockVersion.hasHeader() ?
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
// 5. Read the content length for the content
- if (nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION) {
- contentLength = (int) inputStream.readLong();
- }
+ // Fallback to full-block size if no content-length
+ // TODO replace w/ hasContentLength
+ int contentLength =
+ nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() :
blockSize;
// 6. Read the content or skip content based on IO vs Memory trade-off by
client
- // TODO - have a max block size and reuse this buffer in the ByteBuffer
- // (hard to guess max block size for now)
long contentPosition = inputStream.getPos();
- byte[] content = HoodieLogBlock.readOrSkipContent(inputStream,
contentLength, readBlockLazily);
+ boolean shouldReadLazily = readBlockLazily &&
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+ Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream,
contentLength, shouldReadLazily);
// 7. Read footer if any
- Map<HeaderMetadataType, String> footer = null;
- if (nextBlockVersion.hasFooter()) {
- footer = HoodieLogBlock.getLogMetadata(inputStream);
- }
+ Map<HeaderMetadataType, String> footer =
+ nextBlockVersion.hasFooter() ?
HoodieLogBlock.getLogMetadata(inputStream) : null;
// 8. Read log block length, if present. This acts as a reverse pointer
when traversing a
// log file in reverse
- @SuppressWarnings("unused")
- long logBlockLength = 0;
if (nextBlockVersion.hasLogBlockLength()) {
- logBlockLength = inputStream.readLong();
+ inputStream.readLong();
}
// 9. Read the log block end position in the log file
long blockEndPos = inputStream.getPos();
switch (Objects.requireNonNull(blockType)) {
- // based on type read the block
case AVRO_DATA_BLOCK:
if (nextBlockVersion.getVersion() ==
HoodieLogFormatVersion.DEFAULT_VERSION) {
- return HoodieAvroDataBlock.getBlock(content, readerSchema);
+ return HoodieAvroDataBlock.getBlock(content.get(), readerSchema);
} else {
- return new HoodieAvroDataBlock(logFile, inputStream,
Option.ofNullable(content), readBlockLazily,
- contentPosition, contentLength, blockEndPos, readerSchema,
header, footer, keyField);
+ return new HoodieAvroDataBlock(logFile, inputStream, content,
readBlockLazily,
+ contentPosition, contentLength, blockEndPos,
Option.ofNullable(readerSchema), header, footer, keyField);
}
+
case HFILE_DATA_BLOCK:
- return new HoodieHFileDataBlock(logFile, inputStream,
Option.ofNullable(content), readBlockLazily,
- contentPosition, contentLength, blockEndPos, readerSchema,
- header, footer, enableInlineReading, keyField);
+ checkState(nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION,
+ String.format("HFile block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+ return new HoodieHFileDataBlock(logFile, inputStream, content,
readBlockLazily,
+ contentPosition, contentLength, blockEndPos,
Option.ofNullable(readerSchema),
+ header, footer, keyField, enableRecordLookups);
+
+ case PARQUET_DATA_BLOCK:
+ checkState(nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION,
+ String.format("Parquet block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+ return new HoodieParquetDataBlock(logFile, inputStream, content,
readBlockLazily,
+ contentPosition, contentLength, blockEndPos,
Option.ofNullable(readerSchema), header, footer, keyField);
+
case DELETE_BLOCK:
- return HoodieDeleteBlock.getBlock(logFile, inputStream,
Option.ofNullable(content), readBlockLazily,
+ return HoodieDeleteBlock.getBlock(logFile, inputStream, content,
readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
+
case COMMAND_BLOCK:
- return HoodieCommandBlock.getBlock(logFile, inputStream,
Option.ofNullable(content), readBlockLazily,
+ return HoodieCommandBlock.getBlock(logFile, inputStream, content,
readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
+
default:
throw new HoodieNotSupportedException("Unsupported Block " +
blockType);
}
}
+ @Nullable
+ private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion
blockVersion) throws IOException {
+ if (blockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
+ return null;
Review comment:
The nulls as sentinel here makes one nervous. Can we use Option?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException {
HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
// 3. Read the block type for a log block
- if (nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION) {
- type = inputStream.readInt();
-
- ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length,
"Invalid block byte type found " + type);
- blockType = HoodieLogBlockType.values()[type];
- }
+ HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
// 4. Read the header for a log block, if present
- if (nextBlockVersion.hasHeader()) {
- header = HoodieLogBlock.getLogMetadata(inputStream);
- }
- int contentLength = blocksize;
+ Map<HeaderMetadataType, String> header =
+ nextBlockVersion.hasHeader() ?
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
// 5. Read the content length for the content
- if (nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION) {
- contentLength = (int) inputStream.readLong();
- }
+ // Fallback to full-block size if no content-length
+ // TODO replace w/ hasContentLength
+ int contentLength =
+ nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() :
blockSize;
// 6. Read the content or skip content based on IO vs Memory trade-off by
client
- // TODO - have a max block size and reuse this buffer in the ByteBuffer
- // (hard to guess max block size for now)
long contentPosition = inputStream.getPos();
- byte[] content = HoodieLogBlock.readOrSkipContent(inputStream,
contentLength, readBlockLazily);
+ boolean shouldReadLazily = readBlockLazily &&
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+ Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream,
contentLength, shouldReadLazily);
// 7. Read footer if any
- Map<HeaderMetadataType, String> footer = null;
- if (nextBlockVersion.hasFooter()) {
- footer = HoodieLogBlock.getLogMetadata(inputStream);
- }
+ Map<HeaderMetadataType, String> footer =
Review comment:
Can we work with empty map instead of null?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
public enum HoodieLogBlockType {
- COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK,
HFILE_DATA_BLOCK
+ COMMAND_BLOCK(":command"),
+ DELETE_BLOCK(":delete"),
+ CORRUPT_BLOCK(":corrupted"),
Review comment:
rename: "corrupt" its a valid adjective. Match the enum name ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -58,24 +57,37 @@
import java.util.TreeMap;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
/**
* HoodieHFileDataBlock contains a list of records stored inside an HFile
format. It is used with the HFile
* base file format.
*/
public class HoodieHFileDataBlock extends HoodieDataBlock {
private static final Logger LOG =
LogManager.getLogger(HoodieHFileDataBlock.class);
- private static Compression.Algorithm compressionAlgorithm =
Compression.Algorithm.GZ;
- private static int blockSize = 1 * 1024 * 1024;
- private boolean enableInlineReading = false;
-
- public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream
inputStream, Option<byte[]> content,
- boolean readBlockLazily, long position, long
blockSize, long blockEndpos,
- Schema readerSchema, Map<HeaderMetadataType,
String> header,
- Map<HeaderMetadataType, String> footer, boolean
enableInlineReading, String keyField) {
- super(content, inputStream, readBlockLazily,
- Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndpos)),
- readerSchema, header, footer, keyField);
- this.enableInlineReading = enableInlineReading;
+
+ private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
+ private static final Compression.Algorithm DEFAULT_COMPRESSION_ALGO =
Compression.Algorithm.GZ;
+
+ public HoodieHFileDataBlock(HoodieLogFile logFile,
Review comment:
let's revert back to the wrapped style.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableInlineReading,
+ boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
String keyField) throws IOException {
- FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
- this.logFile = logFile;
- this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+ // NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
+ // is prefixed with an appropriate scheme given that we're not
propagating the FS
+ // further
+ this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));
+ this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
- this.enableInlineReading = enableInlineReading;
+ this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
if (this.reverseReader) {
- this.reverseLogFilePosition = this.lastReverseLogFilePosition =
logFile.getFileSize();
+ this.reverseLogFilePosition = this.lastReverseLogFilePosition =
this.logFile.getFileSize();
}
addShutDownHook();
}
- public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema) throws IOException {
- this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
- }
-
- /**
- * Fetch the right {@link FSDataInputStream} to be used by wrapping with
required input streams.
- * @param fsDataInputStream original instance of {@link FSDataInputStream}.
- * @param fs instance of {@link FileSystem} in use.
- * @param bufferSize buffer size to be used.
- * @return the right {@link FSDataInputStream} as required.
- */
- private FSDataInputStream getFSDataInputStream(FSDataInputStream
fsDataInputStream, FileSystem fs, int bufferSize) {
Review comment:
Where is all this moved to? (note to self)
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
}
@Override
- protected void createRecordsFromContentBytes() throws IOException {
- if (enableInlineReading) {
- getRecords(Collections.emptyList());
- } else {
- super.createRecordsFromContentBytes();
- }
- }
-
- @Override
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- readWithInlineFS(keys);
- return records;
- }
-
- private void readWithInlineFS(List<String> keys) throws IOException {
- boolean enableFullScan = keys.isEmpty();
- // Get schema from the header
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- // If readerSchema was not present, use writerSchema
- if (schema == null) {
- schema = writerSchema;
- }
- Configuration conf = new Configuration();
- CacheConfig cacheConf = new CacheConfig(conf);
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
Configuration inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
Path inlinePath = InLineFSUtils.getInlineFilePath(
- getBlockContentLocation().get().getLogFile().getPath(),
-
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
- getBlockContentLocation().get().getContentPositionInLogFile(),
- getBlockContentLocation().get().getBlockSize());
- if (!enableFullScan) {
- // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
- Collections.sort(keys);
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
+ Collections.sort(keys);
+
+ try (HoodieHFileReader<IndexedRecord> reader =
+ new HoodieHFileReader<>(inlineConf, inlinePath, new
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+ // Get writer's schema from the header
+ List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys,
readerSchema);
+ return
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
}
- HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath,
cacheConf, inlinePath.getFileSystem(inlineConf));
- List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
Review comment:
We should just be using our own Pair.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
public enum HoodieLogBlockType {
- COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK,
HFILE_DATA_BLOCK
+ COMMAND_BLOCK(":command"),
+ DELETE_BLOCK(":delete"),
+ CORRUPT_BLOCK(":corrupted"),
Review comment:
rename: "corrupt" its a valid adjective. Match the enum name ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
}
@Override
- protected void createRecordsFromContentBytes() throws IOException {
- if (enableInlineReading) {
- getRecords(Collections.emptyList());
- } else {
- super.createRecordsFromContentBytes();
- }
- }
-
- @Override
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- readWithInlineFS(keys);
- return records;
- }
-
- private void readWithInlineFS(List<String> keys) throws IOException {
- boolean enableFullScan = keys.isEmpty();
- // Get schema from the header
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- // If readerSchema was not present, use writerSchema
- if (schema == null) {
- schema = writerSchema;
- }
- Configuration conf = new Configuration();
- CacheConfig cacheConf = new CacheConfig(conf);
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
Configuration inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
Path inlinePath = InLineFSUtils.getInlineFilePath(
- getBlockContentLocation().get().getLogFile().getPath(),
-
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
- getBlockContentLocation().get().getContentPositionInLogFile(),
- getBlockContentLocation().get().getBlockSize());
- if (!enableFullScan) {
- // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
- Collections.sort(keys);
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
+ Collections.sort(keys);
+
+ try (HoodieHFileReader<IndexedRecord> reader =
+ new HoodieHFileReader<>(inlineConf, inlinePath, new
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+ // Get writer's schema from the header
+ List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys,
readerSchema);
+ return
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
}
- HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath,
cacheConf, inlinePath.getFileSystem(inlineConf));
- List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
- reader.readRecords(keys, schema);
- reader.close();
- this.records = logRecords.stream().map(t ->
t.getSecond()).collect(Collectors.toList());
}
@Override
- protected void deserializeRecords() throws IOException {
+ protected List<IndexedRecord> deserializeRecords(byte[] content) throws
IOException {
+ checkState(readerSchema != null, "Reader's schema has to be non-null");
Review comment:
Is this because of change in the PR? the original code seems to expect
null reader schema at times?
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/testutils/HadoopMapRedUtils.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.util.Option;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HadoopMapRedUtils {
Review comment:
Could we rewrite the tests in a different way, than relying on the
mapred classes. generally, they are a world of pain as we test across hadoop
versions
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+ public HoodieParquetDataBlock(HoodieLogFile logFile,
+ FSDataInputStream inputStream,
+ Option<byte[]> content,
+ boolean readBlockLazily,
+ long position, long blockSize, long
blockEndPos,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyField) {
+ super(content,
+ inputStream,
+ readBlockLazily,
+ Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndPos)),
+ readerSchema,
+ header,
+ footer,
+ keyField,
+ false);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull String keyField
+ ) {
+ super(records, header, new HashMap<>(), keyField);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header
+ ) {
+ super(records, header, new HashMap<>(),
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ }
+
+ @Override
+ public HoodieLogBlockType getBlockType() {
+ return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+ }
+
+ @Override
+ protected byte[] serializeRecords(List<IndexedRecord> records) throws
IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(writerSchema), writerSchema,
Option.empty());
+
+ HoodieAvroParquetConfig avroParquetConfig =
+ new HoodieAvroParquetConfig(
+ writeSupport,
+ // TODO fetch compression codec from the config
+ CompressionCodecName.GZIP,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE,
+ 1024 * 1024 * 1024,
+ new Configuration(),
+
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
+ try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new
HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
+ for (IndexedRecord record : records) {
+ String recordKey = getRecordKey(record);
+ parquetWriter.writeAvro(recordKey, record);
+ }
+ outputStream.flush();
+ }
+ }
+
+ return baos.toByteArray();
+ }
+
+ public static Iterator<IndexedRecord>
getProjectedParquetRecordsIterator(Configuration conf,
+
Schema readerSchema,
+
InputFile inputFile) throws IOException {
+ AvroReadSupport.setAvroReadSchema(conf, readerSchema);
+ AvroReadSupport.setRequestedProjection(conf, readerSchema);
+ ParquetReader<IndexedRecord> reader =
+
AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
+ return new ParquetReaderIterator<>(reader);
+ }
+
+ /**
+ * NOTE: We're overriding the whole reading sequence to make sure we
properly respect
+ * the requested Reader's schema and only fetch the columns that have
been explicitly
+ * requested by the caller (providing projected Reader's schema)
+ */
+ @Override
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
+ Configuration inlineConf = new Configuration();
+ inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
+ Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ ArrayList<IndexedRecord> records = new ArrayList<>();
+
+ getProjectedParquetRecordsIterator(
+ inlineConf,
Review comment:
If someone manually sets `spark.sparkContext.hadoopConfiguration.set(x,
y)` say configuring cloud access creds, this is going to be an issue, since
`new Configuration()` won't pick that up
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/io/ByteBufferBackedInputStream.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.io;
+
+import javax.annotation.Nonnull;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Instance of {@link InputStream} backed by {@link ByteBuffer}, implementing
following
+ * functionality (on top of what's required by {@link InputStream})
+ *
+ * <ol>
+ * <li>Seeking: enables random access by allowing to seek to an arbitrary
position w/in the stream</li>
+ * <li>(Thread-safe) Copying: enables to copy from the underlying buffer not
modifying the state of the stream</li>
+ * </ol>
+ *
+ * NOTE: Generally methods of this class are NOT thread-safe, unless specified
otherwise
+ */
+public class ByteBufferBackedInputStream extends InputStream {
Review comment:
can we UT this class?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
public enum HoodieLogBlockType {
- COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK,
HFILE_DATA_BLOCK
+ COMMAND_BLOCK(":command"),
Review comment:
why the :, Why do we need the id?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+ public HoodieParquetDataBlock(HoodieLogFile logFile,
+ FSDataInputStream inputStream,
+ Option<byte[]> content,
+ boolean readBlockLazily,
+ long position, long blockSize, long
blockEndPos,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyField) {
+ super(content,
+ inputStream,
+ readBlockLazily,
+ Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndPos)),
+ readerSchema,
+ header,
+ footer,
+ keyField,
+ false);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull String keyField
+ ) {
+ super(records, header, new HashMap<>(), keyField);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header
+ ) {
+ super(records, header, new HashMap<>(),
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ }
+
+ @Override
+ public HoodieLogBlockType getBlockType() {
+ return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+ }
+
+ @Override
+ protected byte[] serializeRecords(List<IndexedRecord> records) throws
IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(writerSchema), writerSchema,
Option.empty());
+
+ HoodieAvroParquetConfig avroParquetConfig =
+ new HoodieAvroParquetConfig(
+ writeSupport,
+ // TODO fetch compression codec from the config
+ CompressionCodecName.GZIP,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE,
+ 1024 * 1024 * 1024,
+ new Configuration(),
+
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
+ try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new
HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
+ for (IndexedRecord record : records) {
+ String recordKey = getRecordKey(record);
+ parquetWriter.writeAvro(recordKey, record);
+ }
+ outputStream.flush();
+ }
+ }
+
+ return baos.toByteArray();
+ }
+
+ public static Iterator<IndexedRecord>
getProjectedParquetRecordsIterator(Configuration conf,
+
Schema readerSchema,
+
InputFile inputFile) throws IOException {
+ AvroReadSupport.setAvroReadSchema(conf, readerSchema);
+ AvroReadSupport.setRequestedProjection(conf, readerSchema);
+ ParquetReader<IndexedRecord> reader =
+
AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
+ return new ParquetReaderIterator<>(reader);
+ }
+
+ /**
+ * NOTE: We're overriding the whole reading sequence to make sure we
properly respect
+ * the requested Reader's schema and only fetch the columns that have
been explicitly
+ * requested by the caller (providing projected Reader's schema)
+ */
+ @Override
+ protected List<IndexedRecord> readRecordsFromBlockPayload() throws
IOException {
+ Configuration inlineConf = new Configuration();
+ inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
+ Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ ArrayList<IndexedRecord> records = new ArrayList<>();
+
+ getProjectedParquetRecordsIterator(
+ inlineConf,
Review comment:
+1 , given we are cleaning up a bunch here, lets also fix this. We need
to wire in the config from the TableMetaClient into this layer.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+// TODO unify w/ HoodieParquetWriter
+public class HoodieParquetStreamWriter<R extends IndexedRecord> implements
AutoCloseable {
Review comment:
why do we need this separate class? Why not enhance `HoodieParquetWriter`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
}
@Override
- protected void createRecordsFromContentBytes() throws IOException {
- if (enableInlineReading) {
- getRecords(Collections.emptyList());
- } else {
- super.createRecordsFromContentBytes();
- }
- }
-
- @Override
- public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
- readWithInlineFS(keys);
- return records;
- }
-
- private void readWithInlineFS(List<String> keys) throws IOException {
- boolean enableFullScan = keys.isEmpty();
- // Get schema from the header
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- // If readerSchema was not present, use writerSchema
- if (schema == null) {
- schema = writerSchema;
- }
- Configuration conf = new Configuration();
- CacheConfig cacheConf = new CacheConfig(conf);
+ protected List<IndexedRecord> lookupRecords(List<String> keys) throws
IOException {
Configuration inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
Path inlinePath = InLineFSUtils.getInlineFilePath(
- getBlockContentLocation().get().getLogFile().getPath(),
-
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
- getBlockContentLocation().get().getContentPositionInLogFile(),
- getBlockContentLocation().get().getBlockSize());
- if (!enableFullScan) {
- // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
- Collections.sort(keys);
+ blockContentLoc.getLogFile().getPath(),
+
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+
+ // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
+ Collections.sort(keys);
+
+ try (HoodieHFileReader<IndexedRecord> reader =
+ new HoodieHFileReader<>(inlineConf, inlinePath, new
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+ // Get writer's schema from the header
+ List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys,
readerSchema);
+ return
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
}
- HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath,
cacheConf, inlinePath.getFileSystem(inlineConf));
- List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
- reader.readRecords(keys, schema);
- reader.close();
- this.records = logRecords.stream().map(t ->
t.getSecond()).collect(Collectors.toList());
}
@Override
- protected void deserializeRecords() throws IOException {
+ protected List<IndexedRecord> deserializeRecords(byte[] content) throws
IOException {
+ checkState(readerSchema != null, "Reader's schema has to be non-null");
Review comment:
Is this because of change in the PR? the original code seems to expect
null reader schema at times?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Implementation of {@link InputFile} backed by {@code byte[]} buffer
+ */
+public class ByteBufferBackedInputFile implements InputFile {
+ private final byte[] buffer;
+ private final int offset;
+ private final int length;
+
+ public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public ByteBufferBackedInputFile(byte[] buffer) {
+ this(buffer, 0, buffer.length);
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
Review comment:
codecov was misleading. So we removed it. lets add unit tests for the
new files.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.hudi.parquet.io.ByteBufferBackedInputFile;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+ public HoodieParquetDataBlock(
+ HoodieLogFile logFile,
+ FSDataInputStream inputStream,
+ Option<byte[]> content,
+ boolean readBlockLazily, long position, long blockSize, long blockEndpos,
+ Option<Schema> readerSchema,
+ Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer,
+ String keyField
+ ) {
+ super(
+ content,
+ inputStream,
+ readBlockLazily,
+ Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndpos)),
+ readerSchema,
+ header,
+ footer,
+ keyField,
+ false);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull String keyField
+ ) {
+ super(records, header, new HashMap<>(), keyField);
+ }
+
+ public HoodieParquetDataBlock(
+ @Nonnull List<IndexedRecord> records,
+ @Nonnull Map<HeaderMetadataType, String> header
+ ) {
+ super(records, header, new HashMap<>(),
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ }
+
+ @Override
+ public HoodieLogBlockType getBlockType() {
+ return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+ }
+
+ @Override
+ protected byte[] serializeRecords(List<IndexedRecord> records) throws
IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(writerSchema), writerSchema,
Option.empty());
+
+ HoodieAvroParquetConfig avroParquetConfig =
+ new HoodieAvroParquetConfig(
+ writeSupport,
+ // TODO fetch compression codec from the config
+ CompressionCodecName.GZIP,
Review comment:
But let's fix this occurrence in this PR. Don't want it to be buried in
a broader effort. Or all references you mention are around log reader/writer
code - that can be prioritized right away?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * Implementation of the {@link OutputFile} backed by {@link
java.io.OutputStream}
+ */
+public class OutputStreamBackedOutputFile implements OutputFile {
Review comment:
Do we need these to move away from older parquet APis? can you share
more on why we are introducing the input/output file classes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]