bharatviswa504 commented on a change in pull request #1613: URL: https://github.com/apache/ozone/pull/1613#discussion_r533779978
########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.request.upgrade; + +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; + +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; + +/** + * OM Request used to flush all transactions to disk, take a DB snapshot, and + * purge the logs, leaving Ratis in a clean state without unapplied log + * entries. This prepares the OM for upgrades/downgrades so that no request + * in the log is applied to the database in the old version of the code in one + * OM, and the new version of the code on another OM. + */ +public class OMPrepareRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMPrepareRequest.class); + + // Allow double buffer this many seconds to flush all transactions before + // returning an error to the caller. + private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT = + Duration.of(5, ChronoUnit.MINUTES); + // Time between checks to see if double buffer finished flushing. + private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL = + Duration.of(1, ChronoUnit.SECONDS); + + public OMPrepareRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMClientResponse validateAndUpdateCache( + OzoneManager ozoneManager, long transactionLogIndex, + OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) { + + LOG.info("Received prepare request with log index {}", transactionLogIndex); + + OMResponse.Builder responseBuilder = + OmResponseUtil.getOMResponseBuilder(getOmRequest()); + responseBuilder.setCmdType(Type.Prepare); + OMClientResponse response = null; + + try { + // Create response. + PrepareResponse omResponse = PrepareResponse.newBuilder() + .setTxnID(transactionLogIndex) + .build(); + responseBuilder.setPrepareResponse(omResponse); + response = new OMPrepareResponse(responseBuilder.build()); + + // Add response to double buffer before clearing logs. + // This guarantees the log index of this request will be the same as + // the snapshot index in the prepared state. + ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex); + + // Wait for outstanding double buffer entries to flush to disk, + // so they will not be purged from the log before being persisted to + // the DB. + // Since the response for this request was added to the double buffer + // already, once this index reaches the state machine, we know all + // transactions have been flushed. + waitForDoubleBufferFlush(ozoneManager, transactionLogIndex); + + OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer(); + RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer(); + RaftServerImpl serverImpl = + server.getImpl(omRatisServer.getRaftGroup().getGroupId()); + + takeSnapshotAndPurgeLogs(((RaftServerProxy) omRatisServer.getServer()) Review comment: Minor: We can pass the serverImpl which we got in L106 to takeSnapshotAndPurgeLogs ########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.request.upgrade; + +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; + +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; + +/** + * OM Request used to flush all transactions to disk, take a DB snapshot, and + * purge the logs, leaving Ratis in a clean state without unapplied log + * entries. This prepares the OM for upgrades/downgrades so that no request + * in the log is applied to the database in the old version of the code in one + * OM, and the new version of the code on another OM. + */ +public class OMPrepareRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMPrepareRequest.class); + + // Allow double buffer this many seconds to flush all transactions before + // returning an error to the caller. + private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT = + Duration.of(5, ChronoUnit.MINUTES); + // Time between checks to see if double buffer finished flushing. + private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL = + Duration.of(1, ChronoUnit.SECONDS); + + public OMPrepareRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMClientResponse validateAndUpdateCache( + OzoneManager ozoneManager, long transactionLogIndex, + OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) { + + LOG.info("Received prepare request with log index {}", transactionLogIndex); + + OMResponse.Builder responseBuilder = + OmResponseUtil.getOMResponseBuilder(getOmRequest()); + responseBuilder.setCmdType(Type.Prepare); + OMClientResponse response = null; + + try { + // Create response. + PrepareResponse omResponse = PrepareResponse.newBuilder() + .setTxnID(transactionLogIndex) + .build(); + responseBuilder.setPrepareResponse(omResponse); + response = new OMPrepareResponse(responseBuilder.build()); + + // Add response to double buffer before clearing logs. + // This guarantees the log index of this request will be the same as + // the snapshot index in the prepared state. + ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex); + + // Wait for outstanding double buffer entries to flush to disk, + // so they will not be purged from the log before being persisted to + // the DB. + // Since the response for this request was added to the double buffer + // already, once this index reaches the state machine, we know all + // transactions have been flushed. + waitForDoubleBufferFlush(ozoneManager, transactionLogIndex); + + OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer(); + RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer(); + RaftServerImpl serverImpl = + server.getImpl(omRatisServer.getRaftGroup().getGroupId()); + + takeSnapshotAndPurgeLogs(((RaftServerProxy) omRatisServer.getServer()) + .getImpl(omRatisServer.getRaftGroup().getGroupId())); + + // TODO: Create marker file with txn index. + + LOG.info("OM prepared at log index {}. Returning response {}", + ozoneManager.getRatisSnapshotIndex(), omResponse); + } catch (IOException e) { + response = new OMPrepareResponse( + createErrorOMResponse(responseBuilder, e)); + } catch (InterruptedException e) { + response = new OMPrepareResponse( + createErrorOMResponse(responseBuilder, new OMException(e, + OMException.ResultCodes.INTERNAL_ERROR))); + } + + return response; + } + + private static void waitForDoubleBufferFlush( + OzoneManager ozoneManager, long txnLogIndex) + throws InterruptedException, IOException { + + long endTime = System.currentTimeMillis() + + DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis(); + boolean success = false; + + while (!success && System.currentTimeMillis() < endTime) { + // If no transactions have been persisted to the DB, transaction info + // will be null, not zero, causing a null pointer exception within + // ozoneManager#getRatisSnaphotIndex. + // Get the transaction directly instead. + OMTransactionInfo txnInfo = ozoneManager.getMetadataManager() + .getTransactionInfoTable().get(TRANSACTION_INFO_KEY); + if (txnInfo == null) { + success = (txnLogIndex == 0); Review comment: Why we need to return success in this case, because when PrepareRequest is added to double-buffer that mean double-buffer will flush the transaction of PrepareRequest to DB, even if there is no request to OM at all. And also not understood the reason for the null pointer exception part ########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.request.upgrade; + +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; + +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; + +/** + * OM Request used to flush all transactions to disk, take a DB snapshot, and + * purge the logs, leaving Ratis in a clean state without unapplied log + * entries. This prepares the OM for upgrades/downgrades so that no request + * in the log is applied to the database in the old version of the code in one + * OM, and the new version of the code on another OM. + */ +public class OMPrepareRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMPrepareRequest.class); + + // Allow double buffer this many seconds to flush all transactions before + // returning an error to the caller. + private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT = + Duration.of(5, ChronoUnit.MINUTES); + // Time between checks to see if double buffer finished flushing. + private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL = + Duration.of(1, ChronoUnit.SECONDS); + + public OMPrepareRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMClientResponse validateAndUpdateCache( + OzoneManager ozoneManager, long transactionLogIndex, + OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) { + + LOG.info("Received prepare request with log index {}", transactionLogIndex); + + OMResponse.Builder responseBuilder = + OmResponseUtil.getOMResponseBuilder(getOmRequest()); + responseBuilder.setCmdType(Type.Prepare); + OMClientResponse response = null; + + try { + // Create response. + PrepareResponse omResponse = PrepareResponse.newBuilder() + .setTxnID(transactionLogIndex) + .build(); + responseBuilder.setPrepareResponse(omResponse); + response = new OMPrepareResponse(responseBuilder.build()); + + // Add response to double buffer before clearing logs. + // This guarantees the log index of this request will be the same as + // the snapshot index in the prepared state. + ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex); + + // Wait for outstanding double buffer entries to flush to disk, + // so they will not be purged from the log before being persisted to + // the DB. + // Since the response for this request was added to the double buffer + // already, once this index reaches the state machine, we know all + // transactions have been flushed. + waitForDoubleBufferFlush(ozoneManager, transactionLogIndex); + + OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer(); + RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer(); + RaftServerImpl serverImpl = + server.getImpl(omRatisServer.getRaftGroup().getGroupId()); + + takeSnapshotAndPurgeLogs(((RaftServerProxy) omRatisServer.getServer()) + .getImpl(omRatisServer.getRaftGroup().getGroupId())); + + // TODO: Create marker file with txn index. + + LOG.info("OM prepared at log index {}. Returning response {}", + ozoneManager.getRatisSnapshotIndex(), omResponse); + } catch (IOException e) { + response = new OMPrepareResponse( + createErrorOMResponse(responseBuilder, e)); + } catch (InterruptedException e) { + response = new OMPrepareResponse( + createErrorOMResponse(responseBuilder, new OMException(e, + OMException.ResultCodes.INTERNAL_ERROR))); + } + + return response; + } + + private static void waitForDoubleBufferFlush( + OzoneManager ozoneManager, long txnLogIndex) + throws InterruptedException, IOException { + + long endTime = System.currentTimeMillis() + + DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis(); + boolean success = false; + + while (!success && System.currentTimeMillis() < endTime) { + // If no transactions have been persisted to the DB, transaction info + // will be null, not zero, causing a null pointer exception within + // ozoneManager#getRatisSnaphotIndex. + // Get the transaction directly instead. + OMTransactionInfo txnInfo = ozoneManager.getMetadataManager() + .getTransactionInfoTable().get(TRANSACTION_INFO_KEY); + if (txnInfo == null) { + success = (txnLogIndex == 0); + } else { + success = (txnInfo.getTransactionIndex() == txnLogIndex); + } + + Thread.sleep(DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL.toMillis()); + } + + // If the timeout waiting for all transactions to reach the state machine + // is exceeded, the exception is propagated, resulting in an error response + // to the client. They can retry the prepare request. + if (!success) { + throw new IOException(String.format("After waiting for %d seconds, " + + "State Machine has not applied all the transactions.", + DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis() * 1000)); + } + } + + /** + * Take a snapshot of the state machine at the last index, and purge ALL logs. + * @param impl RaftServerImpl instance + * @throws IOException on Error. + */ + public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl) + throws IOException { + + StateMachine stateMachine = impl.getStateMachine(); + long snapshotIndex = stateMachine.takeSnapshot(); + + // If the snapshot indices from Ratis and the state machine do not match, + // the exception is propagated, resulting in an error response to the + // client. They can retry the prepare request. + if (snapshotIndex != stateMachine.getLastAppliedTermIndex().getIndex()) { + throw new IOException("Index from Snapshot does not match last applied " + + "Index"); + } + + RaftLog raftLog = impl.getState().getLog(); + // In order to get rid of all logs, make sure we also account for + // intermediate Ratis entries that do not pertain to OM. + long lastIndex = Math.max(snapshotIndex, Review comment: Question: According to design flow, in preAppend we set the prepare flag to true so that no new transactions will be accepted. In which scenario will we have logs more than snapshotIndex and if we purge them those requests will not receive any response from OM leader and timed out? ########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.request.upgrade; + +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; + +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; + +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; + +/** + * OM Request used to flush all transactions to disk, take a DB snapshot, and + * purge the logs, leaving Ratis in a clean state without unapplied log + * entries. This prepares the OM for upgrades/downgrades so that no request + * in the log is applied to the database in the old version of the code in one + * OM, and the new version of the code on another OM. + */ +public class OMPrepareRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMPrepareRequest.class); + + // Allow double buffer this many seconds to flush all transactions before + // returning an error to the caller. + private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT = + Duration.of(5, ChronoUnit.MINUTES); + // Time between checks to see if double buffer finished flushing. + private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL = + Duration.of(1, ChronoUnit.SECONDS); + + public OMPrepareRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMClientResponse validateAndUpdateCache( Review comment: In design flow, the prepare flag is set in preAppend will that be handled in a seperate Jira? ########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.response.upgrade; + +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.response.CleanupTableInfo; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; + +import java.io.IOException; + +/** + * Response for prepare request. + */ +@CleanupTableInfo(cleanupAll = true) Review comment: Why cleanupAll is true in this case, as this has not touched any DB, so it can be false right? ########## File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; + +import java.io.File; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareRequest; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test OM prepare against actual mini cluster. + */ +public class TestOzoneManagerPrepare extends TestOzoneManagerHA { + + private final String keyPrefix = "key"; + private final int timeoutMillis = 30000; + + /** + * Calls prepare on all OMs when they have no transaction information. + * Checks that they are brought into prepare mode successfully. + */ + @Test + public void testPrepareWithoutTransactions() throws Exception { + MiniOzoneHAClusterImpl cluster = getCluster(); + OzoneManager leader = cluster.getOMLeader(); + OMResponse omResponse = submitPrepareRequest(leader.getOmRatisServer()); + // Get the log index of the prepare request. + long prepareRequestLogIndex = + omResponse.getPrepareResponse().getTxnID(); + + // Prepare response processing is included in the snapshot, + // giving index of 1. + Assert.assertEquals(1, prepareRequestLogIndex); + for (OzoneManager om: cluster.getOzoneManagersList()) { + // Leader should be prepared as soon as it returns response. + if (om == leader) { + checkPrepared(om, prepareRequestLogIndex); + } else { + waitAndCheckPrepared(om, prepareRequestLogIndex); + } + } + } + + /** + * Writes data to the cluster via the leader OM, and then prepares it. + * Checks that every OM is prepared successfully. + */ + @Test + public void testPrepareWithTransactions() throws Exception { + MiniOzoneHAClusterImpl cluster = getCluster(); + OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf()); + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + ObjectStore store = ozClient.getObjectStore(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + + Set<String> writtenKeys = new HashSet<>(); + for (int i = 1; i <= 10; i++) { + String keyName = keyPrefix + i; + writeTestData(store, volumeName, bucketName, keyName); + writtenKeys.add(keyName); + } + + // Make sure all OMs have logs from writing data, so we can check that + // they are purged after prepare. + for (OzoneManager om: cluster.getOzoneManagersList()) { + LambdaTestUtils.await(timeoutMillis, 1000, + () -> logFilesPresentInRatisPeer(om)); + } + + OzoneManager leader = cluster.getOMLeader(); + OMResponse omResponse = submitPrepareRequest(leader.getOmRatisServer()); + // Get the log index of the prepare request. + long prepareRequestLogIndex = + omResponse.getPrepareResponse().getTxnID(); + + // Make sure all OMs are prepared and all OMs still have their data. + for (OzoneManager om: cluster.getOzoneManagersList()) { + // Leader should be prepared as soon as it returns response. + if (om == leader) { + checkPrepared(om, prepareRequestLogIndex); + } else { + waitAndCheckPrepared(om, prepareRequestLogIndex); + } + + List<OmKeyInfo> keys = om.getMetadataManager().listKeys(volumeName, + bucketName, null, keyPrefix, 100); + + Assert.assertEquals(writtenKeys.size(), keys.size()); + for (OmKeyInfo keyInfo: keys) { + Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName())); + } + } + } + + /** + * Writes data to the cluster. + * Shuts down one OM. + * Writes more data to the cluster. + * Submits prepare as ratis request. + * Checks that two live OMs are prepared. + * Revives the third OM + * Checks that third OM received all transactions and is prepared. + * @throws Exception + */ Review comment: Is this failing test that needs more changes? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
