alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r771648072
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -36,50 +29,63 @@
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.SizeAwareDataInputStream;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nonnull;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* HoodieAvroDataBlock contains a list of records serialized using Avro. It is
used with the Parquet base file format.
*/
public class HoodieAvroDataBlock extends HoodieDataBlock {
- private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
- private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
- public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
- @Nonnull Map<HeaderMetadataType, String>
logBlockFooter,
- @Nonnull Option<HoodieLogBlockContentLocation>
blockContentLocation, @Nonnull Option<byte[]> content,
- FSDataInputStream inputStream, boolean
readBlockLazily) {
- super(logBlockHeader, logBlockFooter, blockContentLocation, content,
inputStream, readBlockLazily);
- }
-
- public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream
inputStream, Option<byte[]> content,
- boolean readBlockLazily, long position, long
blockSize, long blockEndpos, Schema readerSchema,
- Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, String keyField) {
+ private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
+ private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
+
+ public HoodieAvroDataBlock(
+ HoodieLogFile logFile,
Review comment:
Can you elaborate what aspect you are referring to? Stacking or tabbing?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
##########
@@ -74,40 +75,50 @@ public static Path getInlineFilePath(Path outerPath, String
origScheme, long inL
* @return Outer file Path from the InLineFS Path
*/
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
- final String scheme = inlineFSPath.getParent().getName();
+ assertInlineFSPath(inlineFSPath);
+
+ final String baseFileScheme = inlineFSPath.getParent().getName();
final Path basePath = inlineFSPath.getParent().getParent();
-
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
- "Invalid InLineFSPath: " + inlineFSPath);
+ checkArgument(
+ basePath.toString().contains(SCHEME_SEPARATOR),
+ "Invalid InLineFS path: " + inlineFSPath);
final String pathExceptScheme =
basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) +
1);
- final String fullPath = scheme + SCHEME_SEPARATOR
- + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+ final String fullPath = baseFileScheme + SCHEME_SEPARATOR
+ + (baseFileScheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR :
"")
+ pathExceptScheme;
return new Path(fullPath);
}
/**
- * Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40".
- * output: 20
+ * Returns start offset w/in the base for the block identified by the given
InlineFS path
*
- * @param inlinePath
- * @return
+ * input: "inlinefs://file1/s3a/?start_offset=20&length=40".
+ * output: 20
*/
- public static int startOffset(Path inlinePath) {
- String[] slices = inlinePath.toString().split("[?&=]");
+ public static int startOffset(Path inlineFSPath) {
+ assertInlineFSPath(inlineFSPath);
+
+ String[] slices = inlineFSPath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 3]);
}
/**
- * Eg input : "inlinefs:/file1/s3a/?start_offset=20&length=40".
- * Output: 40
+ * Returns length of the block (embedded w/in the base file) identified by
the given InlineFS path
*
- * @param inlinePath
- * @return
+ * input: "inlinefs:/file1/s3a/?start_offset=20&length=40".
+ * output: 40
*/
public static int length(Path inlinePath) {
+ assertInlineFSPath(inlinePath);
Review comment:
Responded above
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType
logDataBlockFormat, Lis
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
- if (getContent().isPresent()) {
- return getContent().get();
- } else if (readBlockLazily && !getContent().isPresent() && records ==
null) {
- // read block lazily
- createRecordsFromContentBytes();
+ Option<byte[]> content = getContent();
+
+ checkState(content.isPresent() || records != null, "Block is in invalid
state");
Review comment:
We are either reading it from the log (in which case we have content on
read-path) or write them into it (in which case we have records, on the
write-path)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]