alexeykudinkin commented on code in PR #7978:
URL: https://github.com/apache/hudi/pull/7978#discussion_r1117349049
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java:
##########
@@ -36,11 +36,10 @@
*/
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
- private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK =
1000;
-
+ private final HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig;
private final AtomicLong writtenRecordCount = new AtomicLong(0);
private final long maxFileSize;
- private long lastCachedDataSize = -1;
+ private long recordNumForNextCheck;
Review Comment:
nit: `recordCountForNextSizeCheck`
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java:
##########
@@ -34,6 +37,8 @@
private final Configuration hadoopConf;
private final double compressionRatio;
private final boolean dictionaryEnabled;
+ private final long minRowCountForSizeCheck =
DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
Review Comment:
@boneanxs since there's already Parquet config for it, let's just re-use
that (i was advising against introducing new Hudi config dedicated for it, but
we should actually reuse existing Parquet's one)
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java:
##########
@@ -56,23 +55,35 @@ public HoodieBaseParquetWriter(Path file,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
+ this.parquetConfig = parquetConfig;
// We cannot accurately measure the snappy compressed output file size. We
are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the
bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() *
parquetConfig.getCompressionRatio());
+ this.recordNumForNextCheck = parquetConfig.getMinRowCountForSizeCheck();
}
public boolean canWrite() {
- // TODO we can actually do evaluation more accurately:
- // if we cache last data size check, since we account for how many
records
- // were written we can accurately project avg record size, and
therefore
- // estimate how many more records we can write before cut off
- if (lastCachedDataSize == -1 || getWrittenRecordCount() %
WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
- lastCachedDataSize = getDataSize();
+ if (getWrittenRecordCount() >= recordNumForNextCheck) {
+ long dataSize = getDataSize();
+ // In some very extreme cases, like all records are same value, then
it's possible
+ // the dataSize is much lower than the writtenRecordCount(high
compression ratio),
+ // causing avgRecordSize to 0, we'll force the avgRecordSize to 1 for
such cases.
+ long avgRecordSize = Math.max(dataSize / getWrittenRecordCount(), 1);
+ // Follow the parquet block size check logic here, return false
+ // if it is within ~2 records of the limit
+ if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+ return false;
+ }
+ recordNumForNextCheck = Math.min(
Review Comment:
Let's simplify this formula to make it more easily digestable:
```
writtenCount + Math.min(Math.max((maxFileSize / avgRecordSize -
writtenCount) / 2, minCountForCheck), maxCountForCheck)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]