hudi-agent commented on code in PR #18843:
URL: https://github.com/apache/hudi/pull/18843#discussion_r3431489696


##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieAppendHandle.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.SizeEstimator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.TestBaseHoodieTable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link HoodieAppendHandle}'s block-flush sizing logic.
+ *
+ * <p>The block-flush trigger sizes the post-{@code prepareRecord} record (the 
object actually
+ * retained in {@code recordList}), not the incoming pre-{@code prepareRecord} 
record. On Spark
+ * engines the incoming record is a compact {@code UnsafeRow} while the 
buffered record is a
+ * full {@code HoodieAvroIndexedRecord} — sizing the wrong one caused 
production OOMs on
+ * metadata-table writes by letting the buffer grow many times past {@code 
maxBlockSize} worth
+ * of heap before the gate fired.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestHoodieAppendHandle extends HoodieCommonTestHarness {
+
+  private static final String TEST_INSTANT_TIME = "20231201120000";
+  private static final String TEST_PARTITION_PATH = 
DEFAULT_FIRST_PARTITION_PATH;
+  private static final String TEST_FILE_ID = "file-001";
+
+  private HoodieWriteConfig writeConfig;
+  private TaskContextSupplier taskContextSupplier;
+  private HoodieTable hoodieTable;
+
+  @BeforeEach
+  void setUp() throws IOException {
+    initPath();
+    initMetaClient(false);
+    initTestDataGenerator();
+
+    writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withMarkersType("DIRECT")
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+    hoodieTable = new TestBaseHoodieTable(writeConfig, getEngineContext(), 
metaClient);
+    taskContextSupplier = new LocalTaskContextSupplier();
+  }
+
+  @AfterEach
+  void tearDown() throws Exception {
+    cleanMetaClient();
+    cleanupTestDataGenerator();
+  }
+
+  /**
+   * The estimator must size the post-{@code prepareRecord} buffered record, 
not the incoming
+   * pre-{@code prepareRecord} record. On Spark engines, the incoming record 
is a compact
+   * {@code UnsafeRow} while the buffered record is a full {@code 
HoodieAvroIndexedRecord} —
+   * sizing the wrong one was the root cause of production OOMs on 
metadata-table writes.
+   */
+  @Test
+  void testSizeEstimatorReceivesBufferedRecord() {
+    RecordingSizeEstimator spy = new RecordingSizeEstimator(1000L);
+    TestableAppendHandle handle = newTestableHandle(spy);
+
+    HoodieRecord buffered = mock(HoodieRecord.class);
+    handle.simulateBufferedRecord(buffered);
+
+    assertEquals(1, spy.sizedObjects.size(), "estimator should be invoked once 
on the first buffered record");
+    assertSame(buffered, spy.sizedObjects.get(0),
+        "estimator must size the post-prepareRecord buffered record, not the 
incoming record");
+  }
+
+  /**
+   * The initial estimate is seeded lazily on the first buffered record — the 
old code seeded it
+   * eagerly from the incoming pre-{@code prepareRecord} record in {@code 
init()}, which
+   * dramatically under-counted heap on Spark engines.
+   */
+  @Test
+  void testInitialEstimateSeededFromFirstBufferedRecord() {
+    TestableAppendHandle handle = newTestableHandle(new 
RecordingSizeEstimator(2048L));
+
+    assertEquals(0L, handle.getAverageRecordSize(), "no records yet — estimate 
is unseeded");
+
+    handle.simulateBufferedRecord(mock(HoodieRecord.class));
+
+    assertEquals(2048L, handle.getAverageRecordSize(),
+        "first buffered record seeds averageRecordSize directly (no EWMA on 
first sample)");
+  }
+
+  /**
+   * A delete-only window must not perturb {@code averageRecordSize} and must 
not trigger a
+   * flush — {@code recordList} did not grow, so there is no in-heap buffer to 
bound.
+   */
+  @Test
+  void testDeleteOnlyDoesNotPerturbEstimateOrFlush() {
+    RecordingSizeEstimator spy = new RecordingSizeEstimator(1234L);
+    TestableAppendHandle handle = newTestableHandle(spy);
+
+    for (int i = 0; i < 250; i++) {
+      handle.simulateBufferedRecord(null);
+    }
+
+    assertEquals(0, spy.sizedObjects.size(), "estimator never invoked on 
delete-only windows");
+    assertEquals(0L, handle.getAverageRecordSize(), "estimate remains unseeded 
with no buffered records");
+    assertEquals(0, handle.appendInvocations.get(), "flush gate never fires 
when averageRecordSize is 0");
+  }
+
+  /**
+   * Once seeded, the EWMA blends new samples (20%) with the running estimate 
(80%). The sampler
+   * fires every {@code NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE} records.
+   */
+  @Test
+  void testEwmaBlendsSamplesAfterFirstSeed() {
+    TestableAppendHandle handle = newTestableHandle(new 
SteppedSizeEstimator(1000L, 5000L));
+
+    handle.simulateBufferedRecord(mock(HoodieRecord.class));
+    assertEquals(1000L, handle.getAverageRecordSize(), "first record seeds the 
estimate at 1000");
+
+    // 99 more records — the sampler will not fire again until numberOfRecords 
% 100 == 0.
+    // Record #100 returns the second sample (5000); EWMA blends to 0.8*1000 + 
0.2*5000 = 1800.
+    for (int i = 0; i < 99; i++) {
+      handle.simulateBufferedRecord(mock(HoodieRecord.class));
+    }
+    assertEquals(1800L, handle.getAverageRecordSize(), "EWMA after the second 
sample: 0.8*1000 + 0.2*5000");
+  }
+
+  /**
+   * When the per-record estimate reaches {@code maxBlockSize / N}, the gate 
trips after N
+   * records — i.e. {@code numberOfRecords >= maxBlockSize / 
averageRecordSize}.
+   */
+  @Test
+  void testFlushTriggersWhenBufferedRecordsExceedMaxBlockSize() {
+    long perRecord = writeConfig.getLogFileDataBlockMaxSize() / 10;
+    TestableAppendHandle handle = newTestableHandle(new 
RecordingSizeEstimator(perRecord));
+
+    for (int i = 0; i < 9; i++) {
+      handle.simulateBufferedRecord(mock(HoodieRecord.class));
+    }
+    assertEquals(0, handle.appendInvocations.get(), "9 records of 
perRecord-sized buffer: gate has not fired yet");
+
+    handle.simulateBufferedRecord(mock(HoodieRecord.class));
+    assertEquals(1, handle.appendInvocations.get(), "10th record fills the 
block — flush fires");
+    assertEquals(0L, handle.getNumberOfRecords(), "numberOfRecords resets 
after flush");
+    assertTrue(handle.getEstimatedNumberOfBytesWritten() > 0, 
"estimatedNumberOfBytesWritten advances after flush");
+  }
+
+  /** Guards against the test harness silently no-op'ing the flush hook. */
+  @Test
+  void testHarnessAppendOverrideIsActuallyInvoked() {
+    TestableAppendHandle handle =
+        newTestableHandle(new 
RecordingSizeEstimator(writeConfig.getLogFileDataBlockMaxSize()));
+    // Single record whose size == maxBlockSize: gate fires immediately on 
record #1
+    // (numberOfRecords == 1 >= maxBlockSize / maxBlockSize == 1).
+    handle.simulateBufferedRecord(mock(HoodieRecord.class));
+    assertEquals(1, handle.appendInvocations.get(),
+        "harness must route flushes through the override, not the real 
writer");
+    assertNotEquals(0L, handle.getEstimatedNumberOfBytesWritten());
+    assertFalse(handle.getAverageRecordSize() == 0L);

Review Comment:
   🤖 nit: `assertFalse(x == 0L)` produces a failure message of "expected: 
<false> but was: <true>" — could you use `assertNotEquals(0L, 
handle.getAverageRecordSize())` instead? It's already imported and gives a much 
more informative failure message.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to