Copilot commented on code in PR #10100: URL: https://github.com/apache/ozone/pull/10100#discussion_r3331676990
########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHATransactionBufferMonitorTask.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.protobuf.ByteString; +import java.io.File; +import java.time.Clock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.block.BlockManager; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; +import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; +import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests for {@link SCMHATransactionBufferMonitorTask} and the flush race + * conditions it can trigger against {@link SCMHADBTransactionBufferImpl}. + */ +public class TestSCMHATransactionBufferMonitorTask { + + private static final long FLUSH_INTERVAL_MS = 1000L; + private static final TransactionInfo TRX_INFO_T4 = + TransactionInfo.valueOf(1, 4); + private static final TransactionInfo TRX_INFO_T5 = + TransactionInfo.valueOf(1, 5); + + @TempDir + private File testDir; + + private final AtomicLong clockMillis = new AtomicLong(0); + private SCMMetadataStore metadataStore; + private SCMHADBTransactionBufferImpl transactionBuffer; + private Table<String, ByteString> statefulServiceConfigTable; + private Table<String, TransactionInfo> transactionInfoTable; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = SCMTestUtils.getConf(testDir); + metadataStore = new SCMMetadataStoreImpl(conf); + statefulServiceConfigTable = metadataStore.getStatefulServiceConfigTable(); + transactionInfoTable = metadataStore.getTransactionInfoTable(); + + StorageContainerManager scm = mock(StorageContainerManager.class); + BlockManager blockManager = mock(BlockManager.class); + DeletedBlockLogImpl deletedBlockLog = mock(DeletedBlockLogImpl.class); + Clock clock = mock(Clock.class); + when(clock.millis()).thenAnswer(invocation -> clockMillis.get()); + when(scm.getScmMetadataStore()).thenReturn(metadataStore); + when(scm.getSystemClock()).thenReturn(clock); + when(scm.getScmBlockManager()).thenReturn(blockManager); + when(blockManager.getDeletedBlockLog()).thenReturn(deletedBlockLog); + + transactionBuffer = new SCMHADBTransactionBufferImpl(scm); + clockMillis.set(FLUSH_INTERVAL_MS + 1); + } + + @AfterEach + public void cleanup() throws Exception { + if (transactionBuffer != null) { + transactionBuffer.close(); + } + if (metadataStore != null) { + metadataStore.stop(); + } + } + + private void advanceClockPastFlushInterval() { + clockMillis.addAndGet(FLUSH_INTERVAL_MS + 1); + } + + /** + * Demonstrates the partial flush race when shouldFlush and flush are called + * separately: buffered data can be committed with a stale transaction index. + */ + @Test + public void testPartialFlushWithSeparateShouldFlushAndFlush() throws Exception { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T4); + transactionBuffer.flush(); + + transactionBuffer.addToBuffer(statefulServiceConfigTable, "key", + ByteString.copyFromUtf8("value")); + + advanceClockPastFlushInterval(); + if (transactionBuffer.shouldFlush(FLUSH_INTERVAL_MS)) { + transactionBuffer.flush(); + } + + assertEquals(TRX_INFO_T4, transactionInfoTable.get(TRANSACTION_INFO_KEY)); + assertEquals(ByteString.copyFromUtf8("value"), + statefulServiceConfigTable.get("key")); + } + + @Test + public void testFlushIfNeededDoesNotFlushDuringTransactionApply() + throws Exception { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T4); + transactionBuffer.flush(); + + transactionBuffer.beginApplyingTransaction(); + try { + transactionBuffer.addToBuffer(statefulServiceConfigTable, "key", + ByteString.copyFromUtf8("value")); + transactionBuffer.flushIfNeeded(FLUSH_INTERVAL_MS); + assertNull(statefulServiceConfigTable.get("key")); + } finally { + transactionBuffer.endApplyingTransaction(); + } + + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T5); + advanceClockPastFlushInterval(); + transactionBuffer.flushIfNeeded(FLUSH_INTERVAL_MS); + + assertEquals(TRX_INFO_T5, transactionInfoTable.get(TRANSACTION_INFO_KEY)); + assertEquals(ByteString.copyFromUtf8("value"), + statefulServiceConfigTable.get("key")); + } + + /** + * Demonstrates that calling flush() directly inside an applyTransaction + * window (the old behaviour of StatefulServiceStateManagerImpl) persists + * the batch with the stale transaction index that was current before the + * apply updated it. + */ + @Test + public void testDirectFlushDuringApplyWritesStaleTransactionInfo() + throws Exception { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T4); + transactionBuffer.flush(); + + transactionBuffer.beginApplyingTransaction(); + try { + transactionBuffer.addToBuffer(statefulServiceConfigTable, "key", + ByteString.copyFromUtf8("value")); + // Old saveConfiguration behaviour: flush() before updateLatestTrxInfo. + transactionBuffer.flush(); + // Data is on disk, but the transaction index is still T4 — stale. + assertEquals(TRX_INFO_T4, transactionInfoTable.get(TRANSACTION_INFO_KEY)); + assertEquals(ByteString.copyFromUtf8("value"), + statefulServiceConfigTable.get("key")); + } finally { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T5); + transactionBuffer.endApplyingTransaction(); + } + } + + /** + * Verifies that using flushIfNeeded(0) instead of flush() inside an apply + * window defers the write until after updateLatestTrxInfo(), keeping the + * on-disk transaction index consistent with the buffered data. + */ + @Test + public void testFlushIfNeededZeroWaitDefersDuringApply() throws Exception { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T4); + transactionBuffer.flush(); + + transactionBuffer.beginApplyingTransaction(); + try { + transactionBuffer.addToBuffer(statefulServiceConfigTable, "key", + ByteString.copyFromUtf8("value")); + // New saveConfiguration behaviour: skipped because apply is in progress. + transactionBuffer.flushIfNeeded(0); + assertNull(statefulServiceConfigTable.get("key"), + "flushIfNeeded must not flush while a transaction is being applied"); + } finally { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T5); + transactionBuffer.endApplyingTransaction(); + } + + // After the apply window closes, the monitor flushes both data and the + // correct transaction index atomically. + advanceClockPastFlushInterval(); + transactionBuffer.flushIfNeeded(FLUSH_INTERVAL_MS); + + assertEquals(TRX_INFO_T5, transactionInfoTable.get(TRANSACTION_INFO_KEY)); + assertEquals(ByteString.copyFromUtf8("value"), + statefulServiceConfigTable.get("key")); + } + + @Test + public void testMonitorTaskDoesNotPartialFlushDuringTransactionApply() + throws Exception { + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T4); + transactionBuffer.flush(); + + CountDownLatch addedToBuffer = new CountDownLatch(1); + CountDownLatch allowFinishApply = new CountDownLatch(1); + CountDownLatch applyFinished = new CountDownLatch(1); + SCMHATransactionBufferMonitorTask monitorTask = + new SCMHATransactionBufferMonitorTask(transactionBuffer, FLUSH_INTERVAL_MS); + + Thread applyThread = new Thread(() -> { + transactionBuffer.beginApplyingTransaction(); + try { + try { + transactionBuffer.addToBuffer(statefulServiceConfigTable, "key", + ByteString.copyFromUtf8("value")); + } catch (Exception e) { + throw new RuntimeException(e); + } + addedToBuffer.countDown(); + try { + allowFinishApply.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + transactionBuffer.updateLatestTrxInfo(TRX_INFO_T5); + } finally { + transactionBuffer.endApplyingTransaction(); + applyFinished.countDown(); + } + }); + + Thread monitorThread = new Thread(() -> { + try { + while (!applyFinished.await(10, TimeUnit.MILLISECONDS)) { + monitorTask.run(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + applyThread.start(); + monitorThread.start(); + + assertTrue(addedToBuffer.await(10, TimeUnit.SECONDS), + "Timed out waiting for applyThread to add data to buffer"); + monitorTask.run(); + assertNull(statefulServiceConfigTable.get("key"), + "Monitor must not flush before transaction info is updated"); + + allowFinishApply.countDown(); + applyThread.join(10_000); + monitorThread.join(10_000); Review Comment: This test uses timed `join()` calls but does not assert that the threads actually terminated. If either thread fails to finish, it can leave a non-daemon thread running and potentially hang the test JVM later. Add assertions after the joins (or interrupt the threads on timeout) to make failures deterministic. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java: ########## @@ -124,30 +126,64 @@ public AtomicReference<SnapshotInfo> getLatestSnapshotRef() { public void flush() throws RocksDatabaseException, CodecException { rwLock.writeLock().lock(); try { - // write latest trx info into trx table in the same batch - Table<String, TransactionInfo> transactionInfoTable - = metadataStore.getTransactionInfoTable(); - transactionInfoTable.putWithBatch(currentBatchOperation, - TRANSACTION_INFO_KEY, latestTrxInfo); + flushUnderWriteLock(); + } finally { + rwLock.writeLock().unlock(); + } + } - metadataStore.getStore().commitBatchOperation(currentBatchOperation); - currentBatchOperation.close(); - this.latestSnapshot.set(latestTrxInfo.toSnapshotInfo()); - // reset batch operation - currentBatchOperation = metadataStore.getStore().initBatchOperation(); - - DeletedBlockLog deletedBlockLog = scm.getScmBlockManager() - .getDeletedBlockLog(); - Preconditions.checkArgument( - deletedBlockLog instanceof DeletedBlockLogImpl); - ((DeletedBlockLogImpl) deletedBlockLog).onFlush(); + @Override + public void flushIfNeeded(long snapshotWaitTime) + throws RocksDatabaseException, CodecException { + rwLock.writeLock().lock(); + try { + if (applyingTransactions.get() > 0) { + return; + } + long timeDiff = scm.getSystemClock().millis() - lastSnapshotTimeMs; + if (txFlushPending.get() > 0 && timeDiff > snapshotWaitTime) { + LOG.debug("Running TransactionFlushTask"); + flushUnderWriteLock(); + } } finally { - txFlushPending.set(0); - lastSnapshotTimeMs = scm.getSystemClock().millis(); rwLock.writeLock().unlock(); } } + private void flushUnderWriteLock() + throws RocksDatabaseException, CodecException { + // write latest trx info into trx table in the same batch + Table<String, TransactionInfo> transactionInfoTable + = metadataStore.getTransactionInfoTable(); + transactionInfoTable.putWithBatch(currentBatchOperation, + TRANSACTION_INFO_KEY, latestTrxInfo); + + metadataStore.getStore().commitBatchOperation(currentBatchOperation); + currentBatchOperation.close(); + this.latestSnapshot.set(latestTrxInfo.toSnapshotInfo()); + // reset batch operation + currentBatchOperation = metadataStore.getStore().initBatchOperation(); + + DeletedBlockLog deletedBlockLog = scm.getScmBlockManager() + .getDeletedBlockLog(); + Preconditions.checkArgument( + deletedBlockLog instanceof DeletedBlockLogImpl); + ((DeletedBlockLogImpl) deletedBlockLog).onFlush(); + + txFlushPending.set(0); + lastSnapshotTimeMs = scm.getSystemClock().millis(); + } + + @Override + public void beginApplyingTransaction() { + applyingTransactions.incrementAndGet(); + } + + @Override + public void endApplyingTransaction() { + applyingTransactions.decrementAndGet(); + } Review Comment: `endApplyingTransaction()` can drive `applyingTransactions` negative if it’s ever called more times than `beginApplyingTransaction()` (eg, future refactors or error paths). A negative value would incorrectly allow `flushIfNeeded` to flush during an apply window (since the check is `> 0`). Consider guarding against underflow and failing fast when the begin/end pairing is violated. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java: ########## @@ -29,39 +28,24 @@ public class SCMHATransactionBufferMonitorTask implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(SCMHATransactionBufferMonitorTask.class); - private final SCMRatisServer server; private final SCMHADBTransactionBuffer transactionBuffer; private final long flushInterval; /** * SCMService related variables. */ public SCMHATransactionBufferMonitorTask( - SCMHADBTransactionBuffer transactionBuffer, - SCMRatisServer server, long flushInterval) { + SCMHADBTransactionBuffer transactionBuffer, long flushInterval) { this.flushInterval = flushInterval; this.transactionBuffer = transactionBuffer; - this.server = server; } @Override public void run() { - if (transactionBuffer.shouldFlush(flushInterval)) { - LOG.debug("Running TransactionFlushTask"); - // set latest snapshot to null for force snapshot - // the value will be reset again when snapshot is taken - final SnapshotInfo lastSnapshot = transactionBuffer - .getLatestSnapshotRef().getAndSet(null); - try { - server.triggerSnapshot(); - } catch (IOException e) { - LOG.error("Snapshot request is failed", e); - } finally { - // under failure case, if unable to take snapshot, its value - // is reset to previous known value - transactionBuffer.getLatestSnapshotRef().compareAndSet( - null, lastSnapshot); - } + try { + transactionBuffer.flushIfNeeded(flushInterval); + } catch (IOException e) { + LOG.error("TransactionFlushTask is failed", e); } Review Comment: The error log message is grammatically incorrect ("is failed"). Consider updating it so logs are clearer and searchable (eg, grepping for "failed"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
