hudi-agent commented on code in PR #18843: URL: https://github.com/apache/hudi/pull/18843#discussion_r3431489696
########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieAppendHandle.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.LocalTaskContextSupplier; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.SizeEstimator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.TestBaseHoodieTable; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link HoodieAppendHandle}'s block-flush sizing logic. + * + * <p>The block-flush trigger sizes the post-{@code prepareRecord} record (the object actually + * retained in {@code recordList}), not the incoming pre-{@code prepareRecord} record. On Spark + * engines the incoming record is a compact {@code UnsafeRow} while the buffered record is a + * full {@code HoodieAvroIndexedRecord} — sizing the wrong one caused production OOMs on + * metadata-table writes by letting the buffer grow many times past {@code maxBlockSize} worth + * of heap before the gate fired. + */ +@ExtendWith(MockitoExtension.class) +public class TestHoodieAppendHandle extends HoodieCommonTestHarness { + + private static final String TEST_INSTANT_TIME = "20231201120000"; + private static final String TEST_PARTITION_PATH = DEFAULT_FIRST_PARTITION_PATH; + private static final String TEST_FILE_ID = "file-001"; + + private HoodieWriteConfig writeConfig; + private TaskContextSupplier taskContextSupplier; + private HoodieTable hoodieTable; + + @BeforeEach + void setUp() throws IOException { + initPath(); + initMetaClient(false); + initTestDataGenerator(); + + writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withMarkersType("DIRECT") + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + hoodieTable = new TestBaseHoodieTable(writeConfig, getEngineContext(), metaClient); + taskContextSupplier = new LocalTaskContextSupplier(); + } + + @AfterEach + void tearDown() throws Exception { + cleanMetaClient(); + cleanupTestDataGenerator(); + } + + /** + * The estimator must size the post-{@code prepareRecord} buffered record, not the incoming + * pre-{@code prepareRecord} record. On Spark engines, the incoming record is a compact + * {@code UnsafeRow} while the buffered record is a full {@code HoodieAvroIndexedRecord} — + * sizing the wrong one was the root cause of production OOMs on metadata-table writes. + */ + @Test + void testSizeEstimatorReceivesBufferedRecord() { + RecordingSizeEstimator spy = new RecordingSizeEstimator(1000L); + TestableAppendHandle handle = newTestableHandle(spy); + + HoodieRecord buffered = mock(HoodieRecord.class); + handle.simulateBufferedRecord(buffered); + + assertEquals(1, spy.sizedObjects.size(), "estimator should be invoked once on the first buffered record"); + assertSame(buffered, spy.sizedObjects.get(0), + "estimator must size the post-prepareRecord buffered record, not the incoming record"); + } + + /** + * The initial estimate is seeded lazily on the first buffered record — the old code seeded it + * eagerly from the incoming pre-{@code prepareRecord} record in {@code init()}, which + * dramatically under-counted heap on Spark engines. + */ + @Test + void testInitialEstimateSeededFromFirstBufferedRecord() { + TestableAppendHandle handle = newTestableHandle(new RecordingSizeEstimator(2048L)); + + assertEquals(0L, handle.getAverageRecordSize(), "no records yet — estimate is unseeded"); + + handle.simulateBufferedRecord(mock(HoodieRecord.class)); + + assertEquals(2048L, handle.getAverageRecordSize(), + "first buffered record seeds averageRecordSize directly (no EWMA on first sample)"); + } + + /** + * A delete-only window must not perturb {@code averageRecordSize} and must not trigger a + * flush — {@code recordList} did not grow, so there is no in-heap buffer to bound. + */ + @Test + void testDeleteOnlyDoesNotPerturbEstimateOrFlush() { + RecordingSizeEstimator spy = new RecordingSizeEstimator(1234L); + TestableAppendHandle handle = newTestableHandle(spy); + + for (int i = 0; i < 250; i++) { + handle.simulateBufferedRecord(null); + } + + assertEquals(0, spy.sizedObjects.size(), "estimator never invoked on delete-only windows"); + assertEquals(0L, handle.getAverageRecordSize(), "estimate remains unseeded with no buffered records"); + assertEquals(0, handle.appendInvocations.get(), "flush gate never fires when averageRecordSize is 0"); + } + + /** + * Once seeded, the EWMA blends new samples (20%) with the running estimate (80%). The sampler + * fires every {@code NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE} records. + */ + @Test + void testEwmaBlendsSamplesAfterFirstSeed() { + TestableAppendHandle handle = newTestableHandle(new SteppedSizeEstimator(1000L, 5000L)); + + handle.simulateBufferedRecord(mock(HoodieRecord.class)); + assertEquals(1000L, handle.getAverageRecordSize(), "first record seeds the estimate at 1000"); + + // 99 more records — the sampler will not fire again until numberOfRecords % 100 == 0. + // Record #100 returns the second sample (5000); EWMA blends to 0.8*1000 + 0.2*5000 = 1800. + for (int i = 0; i < 99; i++) { + handle.simulateBufferedRecord(mock(HoodieRecord.class)); + } + assertEquals(1800L, handle.getAverageRecordSize(), "EWMA after the second sample: 0.8*1000 + 0.2*5000"); + } + + /** + * When the per-record estimate reaches {@code maxBlockSize / N}, the gate trips after N + * records — i.e. {@code numberOfRecords >= maxBlockSize / averageRecordSize}. + */ + @Test + void testFlushTriggersWhenBufferedRecordsExceedMaxBlockSize() { + long perRecord = writeConfig.getLogFileDataBlockMaxSize() / 10; + TestableAppendHandle handle = newTestableHandle(new RecordingSizeEstimator(perRecord)); + + for (int i = 0; i < 9; i++) { + handle.simulateBufferedRecord(mock(HoodieRecord.class)); + } + assertEquals(0, handle.appendInvocations.get(), "9 records of perRecord-sized buffer: gate has not fired yet"); + + handle.simulateBufferedRecord(mock(HoodieRecord.class)); + assertEquals(1, handle.appendInvocations.get(), "10th record fills the block — flush fires"); + assertEquals(0L, handle.getNumberOfRecords(), "numberOfRecords resets after flush"); + assertTrue(handle.getEstimatedNumberOfBytesWritten() > 0, "estimatedNumberOfBytesWritten advances after flush"); + } + + /** Guards against the test harness silently no-op'ing the flush hook. */ + @Test + void testHarnessAppendOverrideIsActuallyInvoked() { + TestableAppendHandle handle = + newTestableHandle(new RecordingSizeEstimator(writeConfig.getLogFileDataBlockMaxSize())); + // Single record whose size == maxBlockSize: gate fires immediately on record #1 + // (numberOfRecords == 1 >= maxBlockSize / maxBlockSize == 1). + handle.simulateBufferedRecord(mock(HoodieRecord.class)); + assertEquals(1, handle.appendInvocations.get(), + "harness must route flushes through the override, not the real writer"); + assertNotEquals(0L, handle.getEstimatedNumberOfBytesWritten()); + assertFalse(handle.getAverageRecordSize() == 0L); Review Comment: 🤖 nit: `assertFalse(x == 0L)` produces a failure message of "expected: <false> but was: <true>" — could you use `assertNotEquals(0L, handle.getAverageRecordSize())` instead? It's already imported and gives a much more informative failure message. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
