This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ed3b699877 HDDS-10026. Remove applyTransactionMap and
ratisTransactionMap from OzoneManagerStateMachine. (#5891)
ed3b699877 is described below
commit ed3b699877cdebdb77bf84abc49b4000860f40b0
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Jan 13 23:30:28 2024 -0800
HDDS-10026. Remove applyTransactionMap and ratisTransactionMap from
OzoneManagerStateMachine. (#5891)
---
.../ozone/AbstractRootedOzoneFileSystemTest.java | 13 +-
.../client/rpc/TestOzoneRpcClientWithRatis.java | 6 -
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 28 +-
.../ozone/om/ratis/OzoneManagerRatisServer.java | 3 +-
.../ozone/om/ratis/OzoneManagerRatisSnapshot.java | 34 ---
.../ozone/om/ratis/OzoneManagerStateMachine.java | 282 ++++++---------------
.../metrics/OzoneManagerStateMachineMetrics.java | 11 -
.../ozone/om/request/upgrade/OMPrepareRequest.java | 48 ++--
...OzoneManagerProtocolServerSideTranslatorPB.java | 13 +-
.../om/ratis/TestOzoneManagerDoubleBuffer.java | 3 -
...tOzoneManagerDoubleBufferWithDummyResponse.java | 13 +-
...TestOzoneManagerDoubleBufferWithOMResponse.java | 30 +--
.../om/ratis/TestOzoneManagerStateMachine.java | 157 ++----------
13 files changed, 165 insertions(+), 476 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
index a3555fcf52..61b0281c65 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
@@ -163,12 +163,11 @@ abstract class AbstractRootedOzoneFileSystemTest {
@AfterAll
void shutdown() {
- IOUtils.closeQuietly(client);
+ IOUtils.closeQuietly(fs, userOfs, client);
// Tear down the cluster after EACH set of parameters
if (cluster != null) {
cluster.shutdown();
}
- IOUtils.closeQuietly(fs, userOfs);
}
@BeforeEach
@@ -276,15 +275,7 @@ abstract class AbstractRootedOzoneFileSystemTest {
-> (RootedOzoneFileSystem) FileSystem.get(conf));
if (useOnlyCache) {
- if (omRatisEnabled) {
- cluster.getOzoneManager().getOmRatisServer().getOmStateMachine()
- .getOzoneManagerDoubleBuffer().stopDaemon();
- } else {
- cluster.getOzoneManager().getOmServerProtocol()
- .getOzoneManagerDoubleBuffer().stopDaemon();
- cluster.getOzoneManager().getOmServerProtocol()
- .setShouldFlushCache(false);
- }
+
cluster.getOzoneManager().getOmServerProtocol().setShouldFlushCache(omRatisEnabled);
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 11ce62d278..253193c92e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
-import
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -302,7 +301,6 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
.captureLogs(OzoneManagerStateMachine.LOG);
OzoneManagerStateMachine omSM = getCluster().getOzoneManager()
.getOmRatisServer().getOmStateMachine();
- OzoneManagerStateMachineMetrics metrics = omSM.getMetrics();
Thread thread1 = new Thread(() -> {
try {
@@ -326,8 +324,6 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
omSM.getHandler().setInjector(injector);
thread1.start();
thread2.start();
- GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() > 0,
- 100, 5000);
Thread.sleep(2000);
injector.resume();
@@ -349,8 +345,6 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
}
assertThat(omSMLog.getOutput()).contains("Failed to write, Exception
occurred");
- GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() == 0,
- 100, 5000);
}
private static class OMRequestHandlerPauseInjector extends FaultInjector {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 96bad99e87..d1d971f3f4 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -111,8 +112,7 @@ public final class OzoneManagerDoubleBuffer {
private final OzoneManagerDoubleBufferMetrics
ozoneManagerDoubleBufferMetrics;
private long maxFlushedTransactionsInOneIteration;
- private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
-
+ private final Consumer<TermIndex> updateLastAppliedIndex;
private final boolean isRatisEnabled;
private final boolean isTracingEnabled;
private final Semaphore unFlushedTransactions;
@@ -124,7 +124,7 @@ public final class OzoneManagerDoubleBuffer {
*/
public static class Builder {
private OMMetadataManager mm;
- private OzoneManagerRatisSnapshot rs;
+ private Consumer<TermIndex> updateLastAppliedIndex = termIndex -> { };
private boolean isRatisEnabled = false;
private boolean isTracingEnabled = false;
private int maxUnFlushedTransactionCount = 0;
@@ -138,9 +138,8 @@ public final class OzoneManagerDoubleBuffer {
return this;
}
- public Builder setOzoneManagerRatisSnapShot(
- OzoneManagerRatisSnapshot omrs) {
- this.rs = omrs;
+ Builder setUpdateLastAppliedIndex(Consumer<TermIndex>
updateLastAppliedIndex) {
+ this.updateLastAppliedIndex = updateLastAppliedIndex;
return this;
}
@@ -176,8 +175,6 @@ public final class OzoneManagerDoubleBuffer {
public OzoneManagerDoubleBuffer build() {
if (isRatisEnabled) {
- Preconditions.checkNotNull(rs, "When ratis is enabled, " +
- "OzoneManagerRatisSnapshot should not be null");
Preconditions.checkState(maxUnFlushedTransactionCount > 0L,
"when ratis is enable, maxUnFlushedTransactions " +
"should be bigger than 0");
@@ -186,7 +183,7 @@ public final class OzoneManagerDoubleBuffer {
flushNotifier = new FlushNotifier();
}
- return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
+ return new OzoneManagerDoubleBuffer(mm, updateLastAppliedIndex,
isRatisEnabled,
isTracingEnabled, maxUnFlushedTransactionCount,
flushNotifier, s3SecretManager, threadPrefix);
}
@@ -194,7 +191,7 @@ public final class OzoneManagerDoubleBuffer {
@SuppressWarnings("checkstyle:parameternumber")
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
- OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
+ Consumer<TermIndex> updateLastAppliedIndex,
boolean isRatisEnabled, boolean isTracingEnabled,
int maxUnFlushedTransactions,
FlushNotifier flushNotifier, S3SecretManager s3SecretManager,
@@ -205,7 +202,7 @@ public final class OzoneManagerDoubleBuffer {
this.isTracingEnabled = isTracingEnabled;
this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
this.omMetadataManager = omMetadataManager;
- this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
+ this.updateLastAppliedIndex = updateLastAppliedIndex;
this.ozoneManagerDoubleBufferMetrics =
OzoneManagerDoubleBufferMetrics.create();
this.flushNotifier = flushNotifier;
@@ -335,9 +332,8 @@ public final class OzoneManagerDoubleBuffer {
.map(Entry::getTermIndex)
.sorted()
.collect(Collectors.toList());
- final List<Long> flushedEpochs = flushedTransactions.stream()
- .map(TermIndex::getIndex)
- .collect(Collectors.toList());
+ final int flushedTransactionsSize = flushedTransactions.size();
+ final TermIndex lastTransaction =
flushedTransactions.get(flushedTransactionsSize - 1);
try (BatchOperation batchOperation = omMetadataManager.getStore()
.initBatchOperation()) {
@@ -347,7 +343,6 @@ public final class OzoneManagerDoubleBuffer {
buffer.iterator().forEachRemaining(
entry -> addCleanupEntry(entry, cleanupEpochs));
- final TermIndex lastTransaction =
flushedTransactions.get(flushedTransactions.size() - 1);
addToBatchTransactionInfoWithTrace(lastTraceId,
lastTransaction.getIndex(),
@@ -372,7 +367,6 @@ public final class OzoneManagerDoubleBuffer {
.forEach(f -> f.complete(null));
}
- int flushedTransactionsSize = buffer.size();
flushedTransactionCount.addAndGet(flushedTransactionsSize);
flushIterations.incrementAndGet();
@@ -389,7 +383,7 @@ public final class OzoneManagerDoubleBuffer {
releaseUnFlushedTransactions(flushedTransactionsSize);
}
// update the last updated index in OzoneManagerStateMachine.
- ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+ updateLastAppliedIndex.accept(lastTransaction);
// set metrics.
updateMetrics(flushedTransactionsSize);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index e6a747b35a..6d7e117ada 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -591,9 +591,10 @@ public final class OzoneManagerRatisServer {
}
public void stop() {
+ LOG.info("Stopping {} at port {}", this, port);
try {
+ // Ratis will also close the state machine
server.close();
- omStateMachine.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java
deleted file mode 100644
index aa0e6d98f8..0000000000
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om.ratis;
-
-import java.util.List;
-
-/**
- * Functional interface for OM RatisSnapshot.
- */
-
-public interface OzoneManagerRatisSnapshot {
-
- /**
- * Update lastAppliedIndex in OzoneManager StateMachine.
- * @param flushedEpochs - list of ratis transaction indexes which are
- * flushed to DB.
- */
- void updateLastAppliedIndex(List<Long> flushedEpochs);
-}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 46979b6e7f..45767ec7d0 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -26,15 +26,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -66,6 +62,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.TransactionContext;
@@ -73,6 +70,7 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,15 +101,11 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
private final AtomicInteger statePausedCount = new AtomicInteger(0);
private final String threadPrefix;
- // Map which contains index and term for the ratis transactions which are
- // stateMachine entries which are received through applyTransaction.
- private ConcurrentMap<Long, Long> applyTransactionMap =
- new ConcurrentSkipListMap<>();
+ /** The last {@link TermIndex} received from {@link
#notifyTermIndexUpdated(long, long)}. */
+ private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0,
RaftLog.INVALID_LOG_INDEX);
+ /** The last index skipped by {@link #notifyTermIndexUpdated(long, long)}. */
+ private volatile long lastSkippedIndex = RaftLog.INVALID_LOG_INDEX;
- // Map which contains index and term for the ratis transactions which are
- // conf/metadata entries which are received through notifyIndexUpdate.
- private ConcurrentMap<Long, Long> ratisTransactionMap =
- new ConcurrentSkipListMap<>();
private OzoneManagerStateMachineMetrics metrics;
@@ -177,28 +171,32 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
ozoneManager.omHAMetricsInit(newLeaderId.toString());
}
- /**
- * Called to notify state machine about indexes which are processed
- * internally by Raft Server, this currently happens when conf entries are
- * processed in raft Server. This keep state machine to keep a track of index
- * updates.
- * @param currentTerm term of the current log entry
- * @param index index which is being updated
- */
+ /** Notified by Ratis for non-StateMachine term-index update. */
+ @Override
+ public synchronized void notifyTermIndexUpdated(long currentTerm, long
newIndex) {
+ final long oldIndex = lastNotifiedTermIndex.getIndex();
+ if (newIndex - oldIndex > 1) {
+ lastSkippedIndex = newIndex - 1;
+ }
+ final TermIndex newTermIndex = TermIndex.valueOf(currentTerm, newIndex);
+ lastNotifiedTermIndex = assertUpdateIncreasingly("lastNotified",
lastNotifiedTermIndex, newTermIndex);
+ }
+
+ public TermIndex getLastNotifiedTermIndex() {
+ return lastNotifiedTermIndex;
+ }
+
@Override
- public void notifyTermIndexUpdated(long currentTerm, long index) {
- // SnapshotInfo should be updated when the term changes.
- // The index here refers to the log entry index and the index in
- // SnapshotInfo represents the snapshotIndex i.e. the index of the last
- // transaction included in the snapshot. Hence, snaphsotInfo#index is not
- // updated here.
+ protected synchronized boolean updateLastAppliedTermIndex(TermIndex
newTermIndex) {
+ assertUpdateIncreasingly("lastApplied", getLastAppliedTermIndex(),
newTermIndex);
+ return super.updateLastAppliedTermIndex(newTermIndex);
+ }
- // We need to call updateLastApplied here because now in ratis when a
- // node becomes leader, it is checking stateMachineIndex >=
- // placeHolderIndex (when a node becomes leader, it writes a conf entry
- // with some information like its peers and termIndex). So, calling
- // updateLastApplied updates lastAppliedTermIndex.
- computeAndUpdateLastAppliedIndex(index, currentTerm, null, false);
+ /** Assert if the given {@link TermIndex} is updated increasingly. */
+ private TermIndex assertUpdateIncreasingly(String name, TermIndex
oldTermIndex, TermIndex newTermIndex) {
+ Preconditions.checkArgument(newTermIndex.compareTo(oldTermIndex) >= 0,
+ "%s: newTermIndex = %s < oldTermIndex = %s", name, newTermIndex,
oldTermIndex);
+ return newTermIndex;
}
/**
@@ -331,7 +329,6 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
: OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
final TermIndex termIndex = TermIndex.valueOf(trx.getLogEntry());
- final long trxLogIndex = termIndex.getIndex();
// In the current approach we have one single global thread executor.
// with single thread. Right now this is being done for correctness, as
// applyTransaction will be run on multiple OM's we want to execute the
@@ -346,72 +343,33 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
// lastAppliedIndex in OzoneManager StateMachine, even if other
// executor has completed the transactions with id more.
- // We have 300 transactions, And for each volume we have transactions
- // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
- // 299.
- // Example: Executor1 - Volume1 - 100 (current completed transaction)
- // Example: Executor2 - Volume2 - 299 (current completed transaction)
-
- // Now we have applied transactions of 0 - 100 and 149 - 299. We
- // cannot update lastAppliedIndex to 299. We need to update it to 100,
- // since 101 - 149 are not applied. When OM restarts it will
- // applyTransactions from lastAppliedIndex.
- // We can update the lastAppliedIndex to 100, and update it to 299,
- // only after completing 101 - 149. In initial stage, we are starting
- // with single global executor. Will revisit this when needed.
-
- // Add the term index and transaction log index to applyTransaction map
- // . This map will be used to update lastAppliedIndex.
-
- CompletableFuture<Message> ratisFuture =
- new CompletableFuture<>();
- applyTransactionMap.put(trxLogIndex, trx.getLogEntry().getTerm());
-
//if there are too many pending requests, wait for doubleBuffer flushing
ozoneManagerDoubleBuffer.acquireUnFlushedTransactions(1);
- CompletableFuture<OMResponse> future = CompletableFuture.supplyAsync(
- () -> runCommand(request, termIndex), executorService);
- future.thenApply(omResponse -> {
- if (!omResponse.getSuccess()) {
- // When INTERNAL_ERROR or METADATA_ERROR it is considered as
- // critical error and terminate the OM. Considering INTERNAL_ERROR
- // also for now because INTERNAL_ERROR is thrown for any error
- // which is not type OMException.
-
- // Not done future with completeExceptionally because if we do
- // that OM will still continue applying transaction until next
- // snapshot. So in OM case if a transaction failed with un
- // recoverable error and if we wait till snapshot to terminate
- // OM, then if some client requested the read transaction of the
- // failed request, there is a chance we shall give wrong result.
- // So, to avoid these kind of issue, we should terminate OM here.
- if (omResponse.getStatus() == INTERNAL_ERROR) {
- terminate(omResponse, OMException.ResultCodes.INTERNAL_ERROR);
- } else if (omResponse.getStatus() == METADATA_ERROR) {
- terminate(omResponse, OMException.ResultCodes.METADATA_ERROR);
- }
- }
-
- // For successful response and for all other errors which are not
- // critical, we can complete future normally.
- ratisFuture.complete(OMRatisHelper.convertResponseToMessage(
- omResponse));
- return ratisFuture;
- });
- return ratisFuture;
+ return CompletableFuture.supplyAsync(() -> runCommand(request,
termIndex), executorService)
+ .thenApply(this::processResponse);
} catch (Exception e) {
return completeExceptionally(e);
}
}
- /**
- * Terminate OM.
- * @param omResponse
- * @param resultCode
- */
- private void terminate(OMResponse omResponse,
- OMException.ResultCodes resultCode) {
+ private Message processResponse(OMResponse omResponse) {
+ if (!omResponse.getSuccess()) {
+ // INTERNAL_ERROR or METADATA_ERROR are considered as critical errors.
+ // In such cases, OM must be terminated instead of completing the future
exceptionally,
+ // Otherwise, OM may continue applying transactions which leads to an
inconsistent state.
+ if (omResponse.getStatus() == INTERNAL_ERROR) {
+ terminate(omResponse, OMException.ResultCodes.INTERNAL_ERROR);
+ } else if (omResponse.getStatus() == METADATA_ERROR) {
+ terminate(omResponse, OMException.ResultCodes.METADATA_ERROR);
+ }
+ }
+
+ // For successful response and non-critical errors, convert the response.
+ return OMRatisHelper.convertResponseToMessage(omResponse);
+ }
+
+ private static void terminate(OMResponse omResponse, OMException.ResultCodes
resultCode) {
OMException exception = new OMException(omResponse.getMessage(),
resultCode);
String errorMessage = "OM Ratis Server has received unrecoverable " +
@@ -474,7 +432,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT);
return new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(ozoneManager.getMetadataManager())
- .setOzoneManagerRatisSnapShot(this::updateLastAppliedIndex)
+ .setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex)
.setmaxUnFlushedTransactionCount(maxUnflushedTransactionSize)
.setThreadPrefix(threadPrefix)
.setS3SecretManager(ozoneManager.getS3SecretManager())
@@ -493,15 +451,35 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
*/
@Override
public long takeSnapshot() throws IOException {
- LOG.info("Current Snapshot Index {}", getLastAppliedTermIndex());
- TermIndex lastTermIndex = getLastAppliedTermIndex();
- final TransactionInfo build = TransactionInfo.valueOf(lastTermIndex);
- ozoneManager.setTransactionInfo(build);
- Table<String, TransactionInfo> txnInfoTable =
- ozoneManager.getMetadataManager().getTransactionInfoTable();
- txnInfoTable.put(TRANSACTION_INFO_KEY, build);
+ // wait until applied == skipped
+ while (getLastAppliedTermIndex().getIndex() < lastSkippedIndex) {
+ if (ozoneManager.isStopped()) {
+ throw new IOException("OzoneManager is already stopped: " +
ozoneManager.getNodeDetails());
+ }
+ try {
+ ozoneManagerDoubleBuffer.awaitFlush();
+ } catch (InterruptedException e) {
+ throw IOUtils.toInterruptedIOException("Interrupted
ozoneManagerDoubleBuffer.awaitFlush", e);
+ }
+ }
+
+ return takeSnapshotImpl();
+ }
+
+ private synchronized long takeSnapshotImpl() throws IOException {
+ final TermIndex applied = getLastAppliedTermIndex();
+ final TermIndex notified = getLastNotifiedTermIndex();
+ final TermIndex snapshot = applied.compareTo(notified) > 0 ? applied :
notified;
+ LOG.info(" applied = {}", applied);
+ LOG.info(" skipped = {}", lastSkippedIndex);
+ LOG.info("notified = {}", notified);
+ LOG.info("snapshot = {}", snapshot);
+
+ final TransactionInfo transactionInfo = TransactionInfo.valueOf(snapshot);
+ ozoneManager.setTransactionInfo(transactionInfo);
+
ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
transactionInfo);
ozoneManager.getMetadataManager().getStore().flushDB();
- return lastTermIndex.getIndex();
+ return snapshot.getIndex();
}
/**
@@ -541,13 +519,13 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
}
@Override
- public void close() throws IOException {
+ public void close() {
// OM should be shutdown as the StateMachine has shutdown.
- LOG.info("StateMachine has shutdown. Shutdown OzoneManager if not " +
- "already shutdown.");
if (!ozoneManager.isStopped()) {
+ LOG.info("Stopping {}. Shutdown also OzoneManager {}.", this,
ozoneManager);
ozoneManager.shutDown("OM state machine is shutdown by Ratis server");
} else {
+ LOG.info("Stopping {}.", this);
stop();
}
}
@@ -614,87 +592,6 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
return omResponse;
}
- /**
- * Update lastAppliedIndex term and it's corresponding term in the
- * stateMachine.
- * @param flushedEpochs
- */
- public void updateLastAppliedIndex(List<Long> flushedEpochs) {
- Preconditions.checkArgument(flushedEpochs.size() > 0);
- computeAndUpdateLastAppliedIndex(
- flushedEpochs.get(flushedEpochs.size() - 1), -1L, flushedEpochs, true);
- }
-
- /**
- * Update State machine lastAppliedTermIndex.
- * @param lastFlushedIndex
- * @param currentTerm
- * @param flushedEpochs - list of ratis transactions flushed to DB. If it
- * is just one index and term, this can be set to null.
- * @param checkMap - if true check applyTransactionMap, ratisTransaction
- * Map and update lastAppliedTermIndex accordingly, else check
- * lastAppliedTermIndex and update it.
- */
- private synchronized void computeAndUpdateLastAppliedIndex(
- long lastFlushedIndex, long currentTerm, List<Long> flushedEpochs,
- boolean checkMap) {
- if (checkMap) {
- List<Long> flushedTrans = new ArrayList<>(flushedEpochs);
- Long appliedTerm = null;
- long appliedIndex = -1;
- for (long i = getLastAppliedTermIndex().getIndex() + 1; ; i++) {
- if (flushedTrans.contains(i)) {
- appliedIndex = i;
- final Long removed = applyTransactionMap.remove(i);
- appliedTerm = removed;
- flushedTrans.remove(i);
- } else if (ratisTransactionMap.containsKey(i)) {
- final Long removed = ratisTransactionMap.remove(i);
- appliedTerm = removed;
- appliedIndex = i;
- } else {
- // Add remaining which are left in flushedEpochs to
- // ratisTransactionMap to be considered further.
- for (long epoch : flushedTrans) {
- ratisTransactionMap.put(epoch, applyTransactionMap.remove(epoch));
- }
- if (LOG.isDebugEnabled()) {
- if (!flushedTrans.isEmpty()) {
- LOG.debug("ComputeAndUpdateLastAppliedIndex due to SM added " +
- "to map remaining {}", flushedTrans);
- }
- }
- break;
- }
- }
- if (appliedTerm != null) {
- updateLastAppliedTermIndex(appliedTerm, appliedIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("ComputeAndUpdateLastAppliedIndex due to SM is {}",
- getLastAppliedTermIndex());
- }
- }
- } else {
- if (getLastAppliedTermIndex().getIndex() + 1 == lastFlushedIndex) {
- updateLastAppliedTermIndex(currentTerm, lastFlushedIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("ComputeAndUpdateLastAppliedIndex due to notifyIndex {}",
- getLastAppliedTermIndex());
- }
- } else {
- ratisTransactionMap.put(lastFlushedIndex, currentTerm);
- if (LOG.isDebugEnabled()) {
- LOG.debug("ComputeAndUpdateLastAppliedIndex due to notifyIndex " +
- "added to map. Passed Term {} index {}, where as lastApplied " +
- "Index {}", currentTerm, lastFlushedIndex,
- getLastAppliedTermIndex());
- }
- }
- }
- this.metrics.updateApplyTransactionMapSize(applyTransactionMap.size());
- this.metrics.updateRatisTransactionMapSize(ratisTransactionMap.size());
- }
-
public void loadSnapshotInfoFromDB() throws IOException {
// This is done, as we have a check in Ratis for not throwing
// LeaderNotReadyException, it checks stateMachineIndex >= raftLog
@@ -752,28 +649,11 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5,
TimeUnit.SECONDS);
- LOG.info("applyTransactionMap size {} ", applyTransactionMap.size());
- if (LOG.isDebugEnabled()) {
- LOG.debug("applyTransactionMap {}",
- applyTransactionMap.keySet().stream().map(Object::toString)
- .collect(Collectors.joining(",")));
- }
- LOG.info("ratisTransactionMap size {}", ratisTransactionMap.size());
- if (LOG.isDebugEnabled()) {
- LOG.debug("ratisTransactionMap {}",
- ratisTransactionMap.keySet().stream().map(Object::toString)
- .collect(Collectors.joining(",")));
- }
if (metrics != null) {
metrics.unRegister();
}
}
- @VisibleForTesting
- void addApplyTransactionTermIndex(long term, long index) {
- applyTransactionMap.put(index, term);
- }
-
/**
* Wait until both buffers are flushed. This is used in cases like
* "follower bootstrap tarball creation" where the rocksDb for the active
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
index 8ab4fe883a..51d26ef7ac 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.om.ratis.metrics;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@@ -63,16 +62,6 @@ public final class OzoneManagerStateMachineMetrics
implements MetricsSource {
}
}
- @VisibleForTesting
- public long getApplyTransactionMapSize() {
- return applyTransactionMapSize.value();
- }
-
- @VisibleForTesting
- public long getRatisTransactionMapSize() {
- return ratisTransactionMapSize.value();
- }
-
public void updateApplyTransactionMapSize(long size) {
this.applyTransactionMapSize.incr(
Math.negateExact(applyTransactionMapSize.value()) + size);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 053daa1c7c..a814459050 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -36,6 +37,7 @@ import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,16 +102,17 @@ public class OMPrepareRequest extends OMClientRequest {
OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
final RaftServer.Division division = omRatisServer.getServerDivision();
-
- // Wait for outstanding double buffer entries to flush to disk,
- // so they will not be purged from the log before being persisted to
- // the DB.
- // Since the response for this request was added to the double buffer
- // already, once this index reaches the state machine, we know all
- // transactions have been flushed.
- waitForLogIndex(transactionLogIndex, ozoneManager, division,
+ final OzoneManagerStateMachine stateMachine = (OzoneManagerStateMachine)
division.getStateMachine();
+
+ // Wait for outstanding double buffer entries
+ // - to be flushed to db, and
+ // - to be notified by Ratis.
+ // The log index returned, will be used as the prepare index, is the
last Ratis commit index
+ // which can be higher than the transactionLogIndex of this request.
+ final long prepareIndex = waitForLogIndex(transactionLogIndex,
ozoneManager, stateMachine,
flushTimeout, flushCheckInterval);
- takeSnapshotAndPurgeLogs(transactionLogIndex, division);
+ Preconditions.assertTrue(prepareIndex >= transactionLogIndex);
+ takeSnapshotAndPurgeLogs(prepareIndex, division);
// Save prepare index to a marker file, so if the OM restarts,
// it will remain in prepare mode as long as the file exists and its
@@ -150,11 +153,15 @@ public class OMPrepareRequest extends OMClientRequest {
}
/**
- * Waits for the specified index to be flushed to the state machine on
- * disk, and to be applied to Ratis's state machine.
+ * Waits for the specified index to be applied to {@link
OzoneManagerStateMachine}.
+ * Note that
+ * - the applied index is updated after the transaction is flushed to db.
+ * - after a transaction (i) is committed, ratis will append another
ratis-metadata transaction (i+1).
+ *
+ * @return the last Ratis commit index
*/
- private static void waitForLogIndex(long minOMDBFlushIndex,
- OzoneManager om, RaftServer.Division division,
+ private static long waitForLogIndex(long minOMDBFlushIndex,
+ OzoneManager om, OzoneManagerStateMachine stateMachine,
Duration flushTimeout, Duration flushCheckInterval)
throws InterruptedException, IOException {
@@ -168,8 +175,10 @@ public class OMPrepareRequest extends OMClientRequest {
// snapshot is taken.
// If we purge logs without waiting for this index, it may not make it to
// the RocksDB snapshot, and then the log entry is lost on this OM.
- long minRatisStateMachineIndex = minOMDBFlushIndex + 1;
+ long minRatisStateMachineIndex = minOMDBFlushIndex + 1; // for the
ratis-metadata transaction
long lastRatisCommitIndex = RaftLog.INVALID_LOG_INDEX;
+
+ // Wait OM state machine to apply the given index.
long lastOMDBFlushIndex = RaftLog.INVALID_LOG_INDEX;
LOG.info("{} waiting for index {} to flush to OM DB and index {} to flush"
+
@@ -184,8 +193,7 @@ public class OMPrepareRequest extends OMClientRequest {
lastOMDBFlushIndex);
// Check ratis state machine.
- lastRatisCommitIndex =
- division.getStateMachine().getLastAppliedTermIndex().getIndex();
+ lastRatisCommitIndex =
stateMachine.getLastNotifiedTermIndex().getIndex();
ratisStateMachineApplied = (lastRatisCommitIndex >=
minRatisStateMachineIndex);
LOG.debug("{} Current Ratis state machine transaction index {}.",
@@ -211,6 +219,7 @@ public class OMPrepareRequest extends OMClientRequest {
flushTimeout.getSeconds(), lastRatisCommitIndex,
minRatisStateMachineIndex));
}
+ return lastRatisCommitIndex;
}
/**
@@ -226,6 +235,7 @@ public class OMPrepareRequest extends OMClientRequest {
RaftServer.Division division) throws IOException {
StateMachine stateMachine = division.getStateMachine();
long snapshotIndex = stateMachine.takeSnapshot();
+ LOG.info("takeSnapshot at {} for prepareIndex {}", snapshotIndex,
prepareIndex);
if (snapshotIndex < prepareIndex) {
throw new IOException(String.format("OM DB snapshot index %d is less " +
@@ -245,12 +255,6 @@ public class OMPrepareRequest extends OMClientRequest {
"match specified purge index {}. ", actualPurgeIndex,
snapshotIndex);
}
-
- if (actualPurgeIndex < prepareIndex) {
- throw new IOException(String.format("Actual purge index %d is less " +
- "than prepare index %d. Some required logs may not have" +
- " been removed.", actualPurgeIndex, prepareIndex));
- }
} catch (ExecutionException e) {
// Ozone manager error handler does not respect exception chaining and
// only displays the message of the top level exception.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index bebb2b4324..a4ec2c2f20 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -115,11 +115,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
} else {
this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(ozoneManager.getMetadataManager())
- // Do nothing.
- // For OM NON-HA code, there is no need to save transaction index.
- // As we wait until the double buffer flushes DB to disk.
- .setOzoneManagerRatisSnapShot((i) -> {
- })
.enableRatis(isRatisEnabled)
.enableTracing(TracingUtil.isTracingEnabled(
ozoneManager.getConfiguration()))
@@ -367,13 +362,11 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
ozoneManagerDoubleBuffer.awaitFlush();
}
- @VisibleForTesting
- public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() {
- return ozoneManagerDoubleBuffer;
- }
-
@VisibleForTesting
public void setShouldFlushCache(boolean shouldFlushCache) {
+ if (ozoneManagerDoubleBuffer != null) {
+ ozoneManagerDoubleBuffer.stopDaemon();
+ }
this.shouldFlushCache = shouldFlushCache;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
index 859d54eb5f..202234a0d4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
@@ -135,15 +135,12 @@ class TestOzoneManagerDoubleBuffer {
when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
- OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
- };
flushNotifier = new OzoneManagerDoubleBuffer.FlushNotifier();
spyFlushNotifier = spy(flushNotifier);
doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(omMetadataManager)
.setS3SecretManager(secretManager)
- .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
.setmaxUnFlushedTransactionCount(1000)
.enableRatis(true)
.setFlushNotifier(spyFlushNotifier)
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index bdff3c0467..635f86f3ab 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -64,7 +64,6 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
private OMMetadataManager omMetadataManager;
private OzoneManagerDoubleBuffer doubleBuffer;
private final AtomicLong trxId = new AtomicLong(0);
- private long lastAppliedIndex;
private long term = 1L;
@TempDir
private Path folder;
@@ -76,12 +75,8 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
folder.toAbsolutePath().toString());
omMetadataManager =
new OmMetadataManagerImpl(configuration, null);
- OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
- lastAppliedIndex = index.get(index.size() - 1);
- };
doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(omMetadataManager)
- .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
.setmaxUnFlushedTransactionCount(10000)
.enableRatis(true)
.build();
@@ -133,15 +128,11 @@ public class
TestOzoneManagerDoubleBufferWithDummyResponse {
OzoneManagerDoubleBufferMetrics.create();
assertEquals(metrics, metricsCopy);
- // Check lastAppliedIndex is updated correctly or not.
- assertEquals(bucketCount, lastAppliedIndex);
-
-
TransactionInfo transactionInfo =
omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ // Check lastAppliedIndex is updated correctly or not.
assertNotNull(transactionInfo);
-
- assertEquals(lastAppliedIndex, transactionInfo.getTransactionIndex());
+ assertEquals(bucketCount, transactionInfo.getTransactionIndex());
assertEquals(term, transactionInfo.getTerm());
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 2d58088e25..7178868dcf 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -87,8 +87,6 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
private OMMetadataManager omMetadataManager;
private OzoneManagerDoubleBuffer doubleBuffer;
private final AtomicLong trxId = new AtomicLong(0);
- private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
- private volatile long lastAppliedIndex;
private long term = 1L;
@TempDir
@@ -109,12 +107,8 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
auditLogger = mock(AuditLogger.class);
when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
- ozoneManagerRatisSnapshot = index -> {
- lastAppliedIndex = index.get(index.size() - 1);
- };
doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(omMetadataManager)
- .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
.setmaxUnFlushedTransactionCount(100000)
.enableRatis(true)
.build();
@@ -198,16 +192,21 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
checkDeletedBuckets(deleteBucketQueue);
// Check lastAppliedIndex is updated correctly or not.
- GenericTestUtils.waitFor(() ->
- bucketCount + deleteCount + 1 == lastAppliedIndex,
+ final long expectedIndex = bucketCount + deleteCount + 1;
+ GenericTestUtils.waitFor(() -> assertTransactionInfo(expectedIndex),
100, 30000);
+ }
- TransactionInfo transactionInfo =
- omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
- assertNotNull(transactionInfo);
-
- assertEquals(lastAppliedIndex, transactionInfo.getTransactionIndex());
- assertEquals(term, transactionInfo.getTerm());
+ private boolean assertTransactionInfo(long lastAppliedIndex) {
+ final TransactionInfo info;
+ try {
+ info =
omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ } catch (IOException e) {
+ return false;
+ }
+ return info != null
+ && info.getTransactionIndex() == lastAppliedIndex
+ && info.getTerm() == term;
}
/**
@@ -275,7 +274,8 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
// running in parallel, so lastAppliedIndex cannot be always
// total transaction count. So, just checking here whether it is less
// than total transaction count.
- assertThat(lastAppliedIndex).isLessThanOrEqualTo(bucketCount + deleteCount
+ 2);
+ final TransactionInfo info =
omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ assertThat(info.getTransactionIndex()).isLessThanOrEqualTo(bucketCount +
deleteCount + 2);
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index bb9e4ee9b9..3ea7c51264 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -36,14 +36,14 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInf
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.TransactionContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -91,154 +91,43 @@ public class TestOzoneManagerStateMachine {
when(ozoneManager.getConfiguration()).thenReturn(conf);
ozoneManagerStateMachine =
new OzoneManagerStateMachine(ozoneManagerRatisServer, false);
- ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
+ }
+
+ static void assertTermIndex(long expectedTerm, long expectedIndex, TermIndex
computed) {
+ assertEquals(expectedTerm, computed.getTerm());
+ assertEquals(expectedIndex, computed.getIndex());
}
@Test
public void testLastAppliedIndex() {
-
- // Happy scenario.
+ ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
+ assertTermIndex(0, RaftLog.INVALID_LOG_INDEX,
ozoneManagerStateMachine.getLastAppliedTermIndex());
+ assertTermIndex(0, 0, ozoneManagerStateMachine.getLastNotifiedTermIndex());
// Conf/metadata transaction.
ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
- assertEquals(0,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(1,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
- List<Long> flushedEpochs = new ArrayList<>();
-
- // Add some apply transactions.
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0, 2);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0, 3);
-
- flushedEpochs.add(2L);
- flushedEpochs.add(3L);
+ assertTermIndex(0, RaftLog.INVALID_LOG_INDEX,
ozoneManagerStateMachine.getLastAppliedTermIndex());
+ assertTermIndex(0, 1, ozoneManagerStateMachine.getLastNotifiedTermIndex());
// call update last applied index
- ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
+ ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0,
2));
+ ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0,
3));
- assertEquals(0,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(3,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
+ assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
+ assertTermIndex(0, 1, ozoneManagerStateMachine.getLastNotifiedTermIndex());
// Conf/metadata transaction.
- ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 4L);
+ ozoneManagerStateMachine.notifyTermIndexUpdated(1L, 4L);
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(4L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
+ assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
+ assertTermIndex(1, 4, ozoneManagerStateMachine.getLastNotifiedTermIndex());
// Add some apply transactions.
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 5L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 6L);
-
- flushedEpochs.clear();
- flushedEpochs.add(5L);
- flushedEpochs.add(6L);
- ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(6L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-
- }
-
-
- @Test
- public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() {
- // Now try a scenario where 1,2,3 transactions are in applyTransactionMap
- // and updateLastAppliedIndex is not called for them, and before that
- // notifyTermIndexUpdated is called with transaction 4. And see now at the
- // end when updateLastAppliedIndex is called with epochs we have
- // lastAppliedIndex as 4 or not.
-
- // Conf/metadata transaction.
- ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
- assertEquals(0,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(1,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-
-
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 2L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 3L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 4L);
-
-
-
- // Conf/metadata transaction.
- ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 5L);
-
- // Still it should be zero, as for 2,3,4 updateLastAppliedIndex is not yet
- // called so the lastAppliedIndex will be at older value.
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(1L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
- List<Long> flushedEpochs = new ArrayList<>();
-
-
- flushedEpochs.add(2L);
- flushedEpochs.add(3L);
- flushedEpochs.add(4L);
-
- ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(5L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
- }
-
-
- @Test
- public void testLastAppliedIndexWithMultipleExecutors() {
-
- // first flush batch
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 1L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 2L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 4L);
-
- List<Long> flushedEpochs = new ArrayList<>();
-
-
- flushedEpochs.add(1L);
- flushedEpochs.add(2L);
- flushedEpochs.add(4L);
-
- ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(2L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-
-
-
- // 2nd flush batch
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 3L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 5L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 6L);
-
- flushedEpochs.clear();
- flushedEpochs.add(3L);
- flushedEpochs.add(5L);
- flushedEpochs.add(6L);
-
- ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(6L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
- // 3rd flush batch
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 7L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 8L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 9L);
- ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 10L);
-
- flushedEpochs.clear();
- flushedEpochs.add(7L);
- flushedEpochs.add(8L);
- flushedEpochs.add(9L);
- flushedEpochs.add(10L);
-
- ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
+ ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(1L,
5L));
+ ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(1L,
6L));
- assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
- assertEquals(10L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
+ assertTermIndex(1, 6, ozoneManagerStateMachine.getLastAppliedTermIndex());
+ assertTermIndex(1, 4, ozoneManagerStateMachine.getLastNotifiedTermIndex());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]