errose28 commented on code in PR #10236:
URL: https://github.com/apache/ozone/pull/10236#discussion_r3228623141
##########
hadoop-ozone/integration-test/src/test/resources/auditlog.properties:
##########
@@ -52,27 +52,34 @@ filter.write.onMismatch = NEUTRAL
# TRACE (least specific, a lot of data)
# ALL (least specific, all data)
-appenders = console, audit
+appenders = console, audit, systemaudit
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{DEFAULT} | %-5level | %c{1} | %msg |
%throwable{3} %n
+appender.console.layout.pattern = %-5level | %c{1} | %msg%n
Review Comment:
Why was the pattern for both audit loggers changed? Can we use the original
pattern for both?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java:
##########
@@ -95,6 +96,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager
ozoneManager, Execut
omMetadataManager.getMetaTable().addCacheEntry(
new CacheKey<>(APPARENT_VERSION_KEY),
CacheValue.get(context.getIndex(), String.valueOf(apparentVersion)));
+ // Clear the finalization_in_progress key from the cache
+ omMetadataManager.getMetaTable().addCacheEntry(
+ new CacheKey<>(OzoneConsts.FINALIZATION_IN_PROGRESS_KEY),
CacheValue.get(System.nanoTime()));
Review Comment:
Looking at other examples it looks like the `epoch` parameter is the Ratis
transaction index.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java:
##########
@@ -297,7 +297,7 @@ public void testInstallSnapshot(@TempDir Path tempDir)
throws Exception {
String toMatch = String.format(
"op=DB_CHECKPOINT_INSTALL
{\"leaderId\":\"%s\",\"term\":\"%d\",\"lastAppliedIndex\":\"%d\"}",
leaderOMNodeId, leaderOMSnapshotTermIndex, followerOMLastAppliedIndex);
- assertTrue(AuditLogTestUtils.auditLogContains(toMatch));
+ assertTrue(AuditLogTestUtils.systemAuditLogContains(toMatch));
Review Comment:
Why was this change required? We didn't modify the `DB_CHECKPOINT_INSTALL`
operation.
##########
hadoop-hdds/framework/src/test/java/org/apache/hadoop/ozone/audit/AuditLogTestUtils.java:
##########
@@ -72,6 +91,7 @@ public static boolean auditLogContains(String... strings) {
public static void truncateAuditLogFile() throws IOException {
Files.write(Paths.get(AUDITLOG_FILENAME), new byte[0]);
+ Files.write(Paths.get(SYSTEM_AUDITLOG_FILENAME), new byte[0]);
Review Comment:
We should probably add the same for `deleteAuditLogFile` below.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java:
##########
@@ -1900,6 +1908,10 @@ public void start() throws IOException {
bootstrap(omNodeDetails);
}
+ if (omUpgradeFinalizeService != null) {
Review Comment:
This is temporary until it is triggered by a client command, correct?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FINALIZATION_IN_PROGRESS_KEY;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service that periodically checks whether SCM has completed
finalization of an upgrade and, if so,
+ * finalizes the OM upgrade.
+ */
+public class OMUpgradeFinalizeService extends BackgroundService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OMUpgradeFinalizeService.class);
+
+ private static final int THREAD_POOL_SIZE = 1;
+ private static final TimeUnit INTERVAL_UNIT = TimeUnit.MILLISECONDS;
+ private static final long TIMEOUT = 60000;
+ private static final AtomicLong RUN_COUNT = new AtomicLong(0);
+
+ private final OzoneManager ozoneManager;
+ private final OMVersionManager versionManager;
+ private final ScmClient scmClient;
+ private final AtomicBoolean stopInitiated = new AtomicBoolean(false);
+
+ /**
+ * Creates an {@code OMUpgradeFinalizeService} with a custom check interval.
+ *
+ * @param ozoneManager the OzoneManager instance
+ * @param versionManager the {@link OMVersionManager} to query the
finalization status
+ * @param scmClient the scmClient instance used to query SCM
+ * @param intervalMs the duration to wait between checks
+ */
+ public OMUpgradeFinalizeService(OzoneManager ozoneManager, OMVersionManager
versionManager, ScmClient scmClient,
+ long intervalMs) {
+ super("OMUpgradeFinalizeService", intervalMs, INTERVAL_UNIT,
THREAD_POOL_SIZE, TIMEOUT,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.versionManager = versionManager;
+ this.scmClient = scmClient;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ if (!versionManager.needsFinalization()) {
+ // Finalization is done (or was never needed), so this service can now
shutdown. To avoid deadlocking on the
+ // executor.awaitTermination by calling shutdown directly, spawn a
thread to perform the shutdown which will
+ // block until this task / thread completes in the executor.
+ if (stopInitiated.compareAndSet(false, true)) {
+ LOG.info("OMUpgradeFinalizeService: finalization is no longer needed,
shutting down.");
+ Thread stopper = new Thread(this::shutdown,
"OMUpgradeFinalizeService-stopper");
+ stopper.setDaemon(true);
+ stopper.start();
+ }
+ return queue; // empty — PeriodicalTask.run() will return without
scheduling work
+ }
+ if (ozoneManager.isLeaderReady()) {
+ queue.add(new UpgradeStatusCheckTask());
+ }
+ return queue;
+ }
+
+ /**
+ * Periodic task that checks upgrade finalization status and logs the result.
+ */
+ private class UpgradeStatusCheckTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() {
+ if (!ozoneManager.isLeaderReady()) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — not the
leader.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ if (versionManager.needsFinalization()) {
+ try {
+ // To finalize OM, first finalization needs to have been started.
Then SCM needs to indicate that it has
+ // completed its finalization work. Only once both of those things
have happened can OM finalize.
+ String finalizationInProgress =
+
ozoneManager.getMetadataManager().getMetaTable().get(FINALIZATION_IN_PROGRESS_KEY);
+ if (finalizationInProgress == null) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — finalization
is not in progress.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ HddsProtos.UpgradeStatus upgradeStatus =
scmClient.getContainerClient().queryUpgradeStatus();
+ if (upgradeStatus.getShouldFinalize()) {
+ LOG.info("The SCM Upgrade has been finalized. OM will now
finalize");
+
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgrade)
+ .setClientId(ozoneManager.getOMNodeId())
Review Comment:
Looks like key deleting services use the same client ID for both the OM
request and Ratis request, and it is a random one. We should probably do the
same.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FINALIZATION_IN_PROGRESS_KEY;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service that periodically checks whether SCM has completed
finalization of an upgrade and, if so,
+ * finalizes the OM upgrade.
+ */
+public class OMUpgradeFinalizeService extends BackgroundService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OMUpgradeFinalizeService.class);
+
+ private static final int THREAD_POOL_SIZE = 1;
+ private static final TimeUnit INTERVAL_UNIT = TimeUnit.MILLISECONDS;
+ private static final long TIMEOUT = 60000;
+ private static final AtomicLong RUN_COUNT = new AtomicLong(0);
+
+ private final OzoneManager ozoneManager;
+ private final OMVersionManager versionManager;
+ private final ScmClient scmClient;
+ private final AtomicBoolean stopInitiated = new AtomicBoolean(false);
+
+ /**
+ * Creates an {@code OMUpgradeFinalizeService} with a custom check interval.
+ *
+ * @param ozoneManager the OzoneManager instance
+ * @param versionManager the {@link OMVersionManager} to query the
finalization status
+ * @param scmClient the scmClient instance used to query SCM
+ * @param intervalMs the duration to wait between checks
+ */
+ public OMUpgradeFinalizeService(OzoneManager ozoneManager, OMVersionManager
versionManager, ScmClient scmClient,
+ long intervalMs) {
+ super("OMUpgradeFinalizeService", intervalMs, INTERVAL_UNIT,
THREAD_POOL_SIZE, TIMEOUT,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.versionManager = versionManager;
+ this.scmClient = scmClient;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ if (!versionManager.needsFinalization()) {
+ // Finalization is done (or was never needed), so this service can now
shutdown. To avoid deadlocking on the
+ // executor.awaitTermination by calling shutdown directly, spawn a
thread to perform the shutdown which will
+ // block until this task / thread completes in the executor.
+ if (stopInitiated.compareAndSet(false, true)) {
+ LOG.info("OMUpgradeFinalizeService: finalization is no longer needed,
shutting down.");
+ Thread stopper = new Thread(this::shutdown,
"OMUpgradeFinalizeService-stopper");
+ stopper.setDaemon(true);
+ stopper.start();
+ }
+ return queue; // empty — PeriodicalTask.run() will return without
scheduling work
+ }
+ if (ozoneManager.isLeaderReady()) {
+ queue.add(new UpgradeStatusCheckTask());
+ }
+ return queue;
+ }
+
+ /**
+ * Periodic task that checks upgrade finalization status and logs the result.
+ */
+ private class UpgradeStatusCheckTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() {
+ if (!ozoneManager.isLeaderReady()) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — not the
leader.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ if (versionManager.needsFinalization()) {
+ try {
+ // To finalize OM, first finalization needs to have been started.
Then SCM needs to indicate that it has
+ // completed its finalization work. Only once both of those things
have happened can OM finalize.
+ String finalizationInProgress =
+
ozoneManager.getMetadataManager().getMetaTable().get(FINALIZATION_IN_PROGRESS_KEY);
+ if (finalizationInProgress == null) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — finalization
is not in progress.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ HddsProtos.UpgradeStatus upgradeStatus =
scmClient.getContainerClient().queryUpgradeStatus();
+ if (upgradeStatus.getShouldFinalize()) {
+ LOG.info("The SCM Upgrade has been finalized. OM will now
finalize");
+
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgrade)
+ .setClientId(ozoneManager.getOMNodeId())
+ .build();
+ OzoneManagerProtocolProtos.OMResponse response =
OzoneManagerRatisUtils.submitRequest(
+ ozoneManager, omRequest, ClientId.randomId(),
RUN_COUNT.getAndIncrement());
Review Comment:
We should increment run count even for no-op operations and include it in a
summary log message at the end of the service similar to what the deleting
services do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]