devmadhuu commented on code in PR #7517:
URL: https://github.com/apache/ozone/pull/7517#discussion_r1886142887
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -186,6 +186,11 @@ public final class ReconServerConfigKeys {
public static final int
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;
+ public static final String OZONE_RECON_TASK_STATUS_COUNTER_DURATION =
Review Comment:
Pls change the name as per above comment.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java:
##########
@@ -110,6 +111,7 @@ protected void configure() {
install(new ReconOmTaskBindingModule());
install(new ReconDaoBindingModule());
+ bind(ReconTaskStatusCounter.class).in(Singleton.class);
Review Comment:
Pls annotate as well`ReconTaskStatusCounter` class with Singleton
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.api.types.ReconTaskStatusStat;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_COUNTER_DURATION}
which defaults
+ * to '5' times {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY}
+ * Each task is mapped to a {@link ReconTaskStatusStat} instance to store the
counts.
+ */
+public class ReconTaskStatusCounter {
+ // Stores the configurable timeout duration i.e. the TTL of the counts
+ private final long timeoutDuration;
+
+ // Task name is mapped from the enum to a Pair of <count of successful runs,
count of failed runs>
+ private final Map<String, ReconTaskStatusStat> taskStatusCounter = new
ConcurrentHashMap<>();
+
+ public ReconTaskStatusCounter() {
+ OzoneConfiguration conf = new OzoneConfiguration();
Review Comment:
This seems not correct. `OzoneConfiguration` is already initialized in
`ReconControllerModule` and this should be injected here in constructor from
there using guice.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.api.types.ReconTaskStatusStat;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_COUNTER_DURATION}
which defaults
+ * to '5' times {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY}
+ * Each task is mapped to a {@link ReconTaskStatusStat} instance to store the
counts.
+ */
+public class ReconTaskStatusCounter {
+ // Stores the configurable timeout duration i.e. the TTL of the counts
+ private final long timeoutDuration;
+
+ // Task name is mapped from the enum to a Pair of <count of successful runs,
count of failed runs>
+ private final Map<String, ReconTaskStatusStat> taskStatusCounter = new
ConcurrentHashMap<>();
+
+ public ReconTaskStatusCounter() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ int countCycles = conf.getInt(
+ OZONE_RECON_TASK_STATUS_COUNTER_DURATION,
+ OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT
+ );
+ long taskSyncInterval = conf.getTimeDuration(
+ OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
+ OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+ TimeUnit.MILLISECONDS
+ );
+ timeoutDuration = taskSyncInterval * countCycles;
+ }
+
+ /**
+ * Checks if the duration of the counters exceeded
+ * the configured {@link org.apache.hadoop.ozone.recon.ReconServerConfigKeys
+ * OZONE_RECON_TASK_STATUS_STORAGE_DURATION} duration.
+ * Default duration/TTL of the counter is 30 minutes
Review Comment:
Not sure how duration calculated as 30 mins. Pls explain.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java:
##########
@@ -18,33 +18,35 @@
package org.apache.hadoop.ozone.recon.scm;
+import org.apache.hadoop.ozone.recon.metrics.ReconTaskStatusCounter;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskStatusUpdater;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.jooq.exception.DataAccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.SQLException;
+
/**
* Any background task that keeps SCM's metadata up to date.
*/
public abstract class ReconScmTask {
private static final Logger LOG =
LoggerFactory.getLogger(ReconScmTask.class);
private Thread taskThread;
- private ReconTaskStatusDao reconTaskStatusDao;
private volatile boolean running;
+ private final ReconTaskStatusUpdater taskStatusUpdater;
- protected ReconScmTask(ReconTaskStatusDao reconTaskStatusDao) {
- this.reconTaskStatusDao = reconTaskStatusDao;
+ protected ReconScmTask(
+ ReconTaskStatusDao reconTaskStatusDao, ReconTaskStatusCounter
taskStatusCounter
+ ) {
+ this.taskStatusUpdater = new ReconTaskStatusUpdater(reconTaskStatusDao,
taskStatusCounter, getTaskName());
}
private void register() {
String taskName = getTaskName();
- if (!reconTaskStatusDao.existsById(taskName)) {
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
- taskName, 0L, 0L);
- reconTaskStatusDao.insert(reconTaskStatusRecord);
- LOG.info("Registered {} task ", taskName);
- }
+ taskStatusUpdater.setTaskName(taskName);
+ taskStatusUpdater.updateDetails();
Review Comment:
This update will overwrite existing record for a status with all column
values zero as if it is first time insertion, if task record already exists.
What if, SCM task fails and couldn't run after registration, then those
overrwritten values will be left there as dirty data.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -270,24 +272,16 @@ public
ReconStorageContainerManagerFacade(OzoneConfiguration conf,
new PipelineActionHandler(pipelineManager, scmContext, conf);
ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
- PipelineSyncTask pipelineSyncTask = new PipelineSyncTask(
- pipelineManager,
- nodeManager,
- scmServiceProvider,
- reconTaskStatusDao,
- reconTaskConfig);
- containerHealthTask = new ContainerHealthTask(
- containerManager, scmServiceProvider, reconTaskStatusDao,
- containerHealthSchemaManager, containerPlacementPolicy,
reconTaskConfig,
- reconContainerMetadataManager, conf);
-
- this.containerSizeCountTask = new ContainerSizeCountTask(
- containerManager,
- scmServiceProvider,
- reconTaskStatusDao,
- reconTaskConfig,
- containerCountBySizeDao,
- utilizationSchemaDefinition);
+ PipelineSyncTask pipelineSyncTask = new PipelineSyncTask(pipelineManager,
nodeManager,
Review Comment:
Just a suggestion, that if any additional argument to be added in new code,
pls don't change the order of arguments completely. Just add the argument at
the end.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -590,26 +589,31 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
+ reconTaskUpdater.setLastTaskRunStatus(-1);
LOG.warn("Unable to get and apply delta updates from OM.",
e.getMessage());
fullSnapshot = true;
+ } finally {
+ reconTaskUpdater.setIsCurrentTaskRunning(0);
+
reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskUpdater.updateDetails();
}
}
if (fullSnapshot) {
+ reconTaskUpdater =
reconTaskUpdater.getInstanceWithTask(OmSnapshotTaskName.OmSnapshotRequest.name());
try {
metrics.incrNumSnapshotRequests();
LOG.info("Obtaining full snapshot from Ozone Manager");
// Update local Recon OM DB to new snapshot.
+ reconTaskUpdater.setIsCurrentTaskRunning(1);
+
reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskUpdater.updateDetails();
boolean success = updateReconOmDBWithNewSnapshot();
// Update timestamp of successful delta updates query.
if (success) {
- ReconTaskStatus reconTaskStatusRecord =
- new ReconTaskStatus(
- OmSnapshotTaskName.OmSnapshotRequest.name(),
- System.currentTimeMillis(),
- getCurrentOMDBSequenceNumber());
- reconTaskStatusDao.update(reconTaskStatusRecord);
+
reconTaskUpdater.setLastUpdatedTimestamp(getCurrentOMDBSequenceNumber());
Review Comment:
I think, this is a mistake. Pls correct this.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -571,17 +566,21 @@ public boolean syncDataFromOM() {
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
+ reconTaskUpdater =
reconTaskUpdater.getInstanceWithTask(OmSnapshotTaskName.OmDeltaRequest.name());
+ reconTaskUpdater.setLastUpdatedSeqNumber(currentSequenceNumber);
+ reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
try (OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(omMetadataManager)) {
LOG.info("Obtaining delta updates from Ozone Manager");
// Get updates from OM and apply to local Recon OM DB.
+ reconTaskUpdater.setIsCurrentTaskRunning(1);
+
reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskUpdater.updateDetails();
Review Comment:
Before calling this, you need to update the currentRunTaskStatus as well.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -248,14 +249,31 @@ private List<String>
processTaskResults(List<Future<Pair<String, Boolean>>>
List<String> failedTasks = new ArrayList<>();
for (Future<Pair<String, Boolean>> f : results) {
String taskName = f.get().getLeft();
+ ReconTaskStatusUpdater taskStatusUpdater = getUpdaterForTask(taskName);
+
taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
Review Comment:
Same as above. We should not overwrite last run status sequence number until
the current task run status is success.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -107,9 +114,14 @@ public synchronized void
consumeOMEvents(OMUpdateEventBatch events,
try {
if (!events.isEmpty()) {
Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
+ ReconTaskStatus reconTaskStatus;
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
getUpdaterForTask(task.getTaskName());
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+
taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
Review Comment:
In the same method here, when we have retry of failed tasks, you need to
update their current status in retries for those failed tasks.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.api.types.ReconTaskStatusStat;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_COUNTER_DURATION}
which defaults
+ * to '5' times {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY}
Review Comment:
Need to change the config constant in comments as well.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/TaskStatusService.java:
##########
@@ -38,16 +42,28 @@ public class TaskStatusService {
@Inject
private ReconTaskStatusDao reconTaskStatusDao;
+ @Inject
+ private ReconTaskStatusCounter taskStatusCounter;
+
+ // Internal function to combine counter value with DerbyDB values
+ private ReconTaskStatusResponse convertToTaskStatusResponse(ReconTaskStatus
task) {
+ ReconTaskStatusStat counter =
taskStatusCounter.getTaskCountFor(task.getTaskName());
+ return new ReconTaskStatusResponse(
+ task.getTaskName(), task.getLastUpdatedSeqNumber(),
task.getLastUpdatedTimestamp(),
+ task.getIsCurrentTaskRunning(), task.getLastTaskRunStatus(),
+ counter.getSuccessCount(), counter.getFailureCount(),
counter.getInitializationTime());
+ }
/**
- * Return the list of Recon Tasks and the last successful timestamp and
- * sequence number.
+ * Return the list of Recon Tasks and associated metrics.
Review Comment:
```suggestion
* Return the list of Recon Tasks and associated stats.
```
##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3304,6 +3304,14 @@
Max retry count for SCM Client when failover happens.
</description>
</property>
+ <property>
+ <name>ozone.recon.task.status.counter.duration</name>
Review Comment:
This config is not a duration. Need to change the name.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.api.types.ReconTaskStatusStat;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_COUNTER_DURATION}
which defaults
+ * to '5' times {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY}
+ * Each task is mapped to a {@link ReconTaskStatusStat} instance to store the
counts.
+ */
+public class ReconTaskStatusCounter {
+ // Stores the configurable timeout duration i.e. the TTL of the counts
+ private final long timeoutDuration;
+
+ // Task name is mapped from the enum to a Pair of <count of successful runs,
count of failed runs>
+ private final Map<String, ReconTaskStatusStat> taskStatusCounter = new
ConcurrentHashMap<>();
+
+ public ReconTaskStatusCounter() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ int countCycles = conf.getInt(
+ OZONE_RECON_TASK_STATUS_COUNTER_DURATION,
+ OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT
+ );
+ long taskSyncInterval = conf.getTimeDuration(
+ OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
+ OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
Review Comment:
This seems incorrect. Pls check.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -186,6 +186,11 @@ public final class ReconServerConfigKeys {
public static final int
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;
+ public static final String OZONE_RECON_TASK_STATUS_COUNTER_DURATION =
+ "ozone.recon.task.status.counter.duration";
+
+ public static final int OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT = 5;
Review Comment:
This too needs to be changed accordingly.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconTaskStatusTableUpgradeAction.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.exception.DataAccessException;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Upgrade action for TASK_STATUS_STATISTICS feature layout change, which adds
+ * <code>last_task_run_status</code> and <code>current_task_run_status</code>
columns to
+ * {@link org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition} in case it
is missing .
+ */
+@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_STATISTICS,
+ type = ReconUpgradeAction.UpgradeActionType.FINALIZE)
+public class ReconTaskStatusTableUpgradeAction implements ReconUpgradeAction {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(ReconTaskStatusTableUpgradeAction.class);
+
+ @Override
+ public void execute(ReconStorageContainerManagerFacade scmFacade) throws
DataAccessException {
+ DataSource dataSource = scmFacade.getDataSource();
+ try (Connection conn = dataSource.getConnection()) {
+ if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) {
+ return;
+ }
+ DSLContext dslContext = DSL.using(conn);
+
+ // This is a workaround as currently the upgrade action runs even for a
fresh install
+ // TODO: Remove the check once HDDS-11846 is fixed
+ if (!COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"last_task_run_status")
+ && !COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"current_task_run_status")) {
Review Comment:
One DB call for both columns existence check is sufficient.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java:
##########
@@ -138,15 +144,24 @@ public void triggerContainerHealthCheck() {
unhealthyContainerStateStatsMap);
long start = Time.monotonicNow();
long currentTime = System.currentTimeMillis();
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(currentTime);
+ taskStatusUpdater.updateDetails();
long existingCount = processExistingDBRecords(currentTime,
unhealthyContainerStateStatsMap);
LOG.debug("Container Health task thread took {} milliseconds to" +
" process {} existing database records.",
Time.monotonicNow() - start, existingCount);
checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
+ taskStatusUpdater.setLastTaskRunStatus(0);
processedContainers.clear();
+ } catch (Exception e) {
+ // For any exception that is thrown we know the container health check
has failed,
Review Comment:
Add an error log here with just exception object `e`
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.api.types.ReconTaskStatusStat;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_COUNTER_DURATION}
which defaults
+ * to '5' times {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY}
+ * Each task is mapped to a {@link ReconTaskStatusStat} instance to store the
counts.
+ */
+public class ReconTaskStatusCounter {
+ // Stores the configurable timeout duration i.e. the TTL of the counts
+ private final long timeoutDuration;
+
+ // Task name is mapped from the enum to a Pair of <count of successful runs,
count of failed runs>
+ private final Map<String, ReconTaskStatusStat> taskStatusCounter = new
ConcurrentHashMap<>();
+
+ public ReconTaskStatusCounter() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ int countCycles = conf.getInt(
+ OZONE_RECON_TASK_STATUS_COUNTER_DURATION,
+ OZONE_RECON_TASK_STATUS_COUNTER_DURATION_DEFAULT
+ );
+ long taskSyncInterval = conf.getTimeDuration(
+ OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
+ OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+ TimeUnit.MILLISECONDS
+ );
+ timeoutDuration = taskSyncInterval * countCycles;
+ }
+
+ /**
+ * Checks if the duration of the counters exceeded
+ * the configured {@link org.apache.hadoop.ozone.recon.ReconServerConfigKeys
+ * OZONE_RECON_TASK_STATUS_STORAGE_DURATION} duration.
+ * Default duration/TTL of the counter is 30 minutes
+ * In case the count data TTL is reached, reinitialize the instance to reset
the data, else do nothing
+ */
+ private void checkCountDataExpiry(String taskName) {
+ ReconTaskStatusStat taskStat = taskStatusCounter.getOrDefault(taskName,
new ReconTaskStatusStat());
+ //Since initially the task list is empty, each task will get initialized
at different times
+ if ((System.currentTimeMillis() - taskStat.getInitializationTime()) >
timeoutDuration) {
+ // If the task stat TTL is expired, we want to reset the associated
counters
+ taskStat.reset();
+ }
+ // Update the map with the for the task stats - this adds the value if not
already present
+ // else update the stat with initial values if TTL is over
+ taskStatusCounter.put(taskName, taskStat);
+ }
+
+ /**
+ * Update the counter's success/failure count based on the task name passed.
+ * @param taskName The task name for which we want to update the counter
+ * @param successful Whether the task was successful or not
+ */
+ public void updateCounter(String taskName, boolean successful) {
+ checkCountDataExpiry(taskName);
+ taskStatusCounter.putIfAbsent(taskName, new ReconTaskStatusStat());
+
+ if (successful) {
+ taskStatusCounter.get(taskName).incrementSuccess();
+ } else {
+ taskStatusCounter.get(taskName).incrementFailure();
+ }
+ }
+
+ public ReconTaskStatusStat getTaskCountFor(String taskName) {
Review Comment:
Pls change the name of this method, as in future this class may hold other
stats also, not just count.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/TaskStatusService.java:
##########
@@ -38,16 +42,28 @@ public class TaskStatusService {
@Inject
private ReconTaskStatusDao reconTaskStatusDao;
+ @Inject
+ private ReconTaskStatusCounter taskStatusCounter;
+
+ // Internal function to combine counter value with DerbyDB values
+ private ReconTaskStatusResponse convertToTaskStatusResponse(ReconTaskStatus
task) {
+ ReconTaskStatusStat counter =
taskStatusCounter.getTaskCountFor(task.getTaskName());
+ return new ReconTaskStatusResponse(
+ task.getTaskName(), task.getLastUpdatedSeqNumber(),
task.getLastUpdatedTimestamp(),
+ task.getIsCurrentTaskRunning(), task.getLastTaskRunStatus(),
+ counter.getSuccessCount(), counter.getFailureCount(),
counter.getInitializationTime());
+ }
/**
- * Return the list of Recon Tasks and the last successful timestamp and
- * sequence number.
+ * Return the list of Recon Tasks and associated metrics.
* @return {@link Response}
*/
@GET
@Path("status")
- public Response getTaskTimes() {
+ public Response getTaskMetrics() {
Review Comment:
```suggestion
public Response getTaskStats() {
```
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java:
##########
@@ -18,33 +18,35 @@
package org.apache.hadoop.ozone.recon.scm;
+import org.apache.hadoop.ozone.recon.metrics.ReconTaskStatusCounter;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskStatusUpdater;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.jooq.exception.DataAccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.SQLException;
+
/**
* Any background task that keeps SCM's metadata up to date.
*/
public abstract class ReconScmTask {
private static final Logger LOG =
LoggerFactory.getLogger(ReconScmTask.class);
private Thread taskThread;
- private ReconTaskStatusDao reconTaskStatusDao;
private volatile boolean running;
+ private final ReconTaskStatusUpdater taskStatusUpdater;
- protected ReconScmTask(ReconTaskStatusDao reconTaskStatusDao) {
- this.reconTaskStatusDao = reconTaskStatusDao;
+ protected ReconScmTask(
+ ReconTaskStatusDao reconTaskStatusDao, ReconTaskStatusCounter
taskStatusCounter
+ ) {
+ this.taskStatusUpdater = new ReconTaskStatusUpdater(reconTaskStatusDao,
taskStatusCounter, getTaskName());
}
private void register() {
String taskName = getTaskName();
- if (!reconTaskStatusDao.existsById(taskName)) {
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
- taskName, 0L, 0L);
- reconTaskStatusDao.insert(reconTaskStatusRecord);
- LOG.info("Registered {} task ", taskName);
- }
+ taskStatusUpdater.setTaskName(taskName);
Review Comment:
You have already set taskName in constructor. Is this needed ?
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/TaskStatusService.java:
##########
@@ -38,16 +42,28 @@ public class TaskStatusService {
@Inject
private ReconTaskStatusDao reconTaskStatusDao;
+ @Inject
+ private ReconTaskStatusCounter taskStatusCounter;
+
+ // Internal function to combine counter value with DerbyDB values
+ private ReconTaskStatusResponse convertToTaskStatusResponse(ReconTaskStatus
task) {
+ ReconTaskStatusStat counter =
taskStatusCounter.getTaskCountFor(task.getTaskName());
+ return new ReconTaskStatusResponse(
+ task.getTaskName(), task.getLastUpdatedSeqNumber(),
task.getLastUpdatedTimestamp(),
+ task.getIsCurrentTaskRunning(), task.getLastTaskRunStatus(),
+ counter.getSuccessCount(), counter.getFailureCount(),
counter.getInitializationTime());
+ }
/**
- * Return the list of Recon Tasks and the last successful timestamp and
- * sequence number.
+ * Return the list of Recon Tasks and associated metrics.
* @return {@link Response}
*/
@GET
@Path("status")
- public Response getTaskTimes() {
+ public Response getTaskMetrics() {
List<ReconTaskStatus> resultSet = reconTaskStatusDao.findAll();
- return Response.ok(resultSet).build();
+ List<ReconTaskStatusResponse> taskMetricsList = resultSet.stream().map(
Review Comment:
```suggestion
List<ReconTaskStatusResponse> taskStatusList = resultSet.stream().map(
```
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java:
##########
@@ -138,15 +144,24 @@ public void triggerContainerHealthCheck() {
unhealthyContainerStateStatsMap);
long start = Time.monotonicNow();
long currentTime = System.currentTimeMillis();
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(currentTime);
+ taskStatusUpdater.updateDetails();
long existingCount = processExistingDBRecords(currentTime,
unhealthyContainerStateStatsMap);
LOG.debug("Container Health task thread took {} milliseconds to" +
" process {} existing database records.",
Time.monotonicNow() - start, existingCount);
checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
+ taskStatusUpdater.setLastTaskRunStatus(0);
processedContainers.clear();
Review Comment:
Add a debug log here after this statement with "Container Health task thread
took {} milliseconds". This will tell duration it took for single run in debug
logs.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -225,28 +228,20 @@ public OzoneManagerServiceProviderImpl(
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix +
"SyncOM-%d")
.build();
this.reconContext = reconContext;
+ this.reconTaskUpdater = new ReconTaskStatusUpdater(reconTaskStatusDao,
reconTaskStatusCounter);
}
public void registerOMDBTasks() {
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
- OmSnapshotTaskName.OmDeltaRequest.name(),
- System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
- if (!reconTaskStatusDao.existsById(
- OmSnapshotTaskName.OmDeltaRequest.name())) {
- reconTaskStatusDao.insert(reconTaskStatusRecord);
- LOG.info("Registered {} task ",
- OmSnapshotTaskName.OmDeltaRequest.name());
- }
- reconTaskStatusRecord = new ReconTaskStatus(
- OmSnapshotTaskName.OmSnapshotRequest.name(),
- System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
- if (!reconTaskStatusDao.existsById(
- OmSnapshotTaskName.OmSnapshotRequest.name())) {
- reconTaskStatusDao.insert(reconTaskStatusRecord);
- LOG.info("Registered {} task ",
- OmSnapshotTaskName.OmSnapshotRequest.name());
- }
+ reconTaskUpdater.setTaskName(OmSnapshotTaskName.OmDeltaRequest.name());
+ reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskUpdater.updateDetails();
+
+ reconTaskUpdater.setTaskName(OmSnapshotTaskName.OmSnapshotRequest.name());
+ reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskUpdater.updateDetails();
Review Comment:
Refactor and reduce code duplication.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java:
##########
@@ -84,17 +88,25 @@ public void run() {
}
public void triggerPipelineSyncTask()
- throws IOException, TimeoutException, NodeNotFoundException {
+ throws IOException, NodeNotFoundException {
lock.writeLock().lock();
try {
long start = Time.monotonicNow();
List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
reconPipelineManager.initializePipelines(pipelinesFromScm);
syncOperationalStateOnDeadNodes();
LOG.debug("Pipeline sync Thread took {} milliseconds.",
Time.monotonicNow() - start);
- recordSingleRunCompletion();
+ taskStatusUpdater.setLastTaskRunStatus(0);
+ } catch (IOException | NodeNotFoundException e) {
+ // If we encounter an exception, increment failure count for task and
bubble forward the exception
+ taskStatusUpdater.setLastTaskRunStatus(-1);
+ throw e;
Review Comment:
Should not throw any exception here, as all SCM tasks being triggered as
daemon running tasks and currently no mechanism for SCM tasks to recover on its
own. One run is failed, and we'll know based on `lastTaskRunStatus` and error
log.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java:
##########
@@ -84,17 +88,25 @@ public void run() {
}
public void triggerPipelineSyncTask()
- throws IOException, TimeoutException, NodeNotFoundException {
+ throws IOException, NodeNotFoundException {
Review Comment:
Remove throws from here once you remove `throw` clause from `catch`
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java:
##########
@@ -84,17 +88,25 @@ public void run() {
}
public void triggerPipelineSyncTask()
- throws IOException, TimeoutException, NodeNotFoundException {
+ throws IOException, NodeNotFoundException {
lock.writeLock().lock();
try {
long start = Time.monotonicNow();
List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
reconPipelineManager.initializePipelines(pipelinesFromScm);
syncOperationalStateOnDeadNodes();
LOG.debug("Pipeline sync Thread took {} milliseconds.",
Time.monotonicNow() - start);
- recordSingleRunCompletion();
+ taskStatusUpdater.setLastTaskRunStatus(0);
+ } catch (IOException | NodeNotFoundException e) {
+ // If we encounter an exception, increment failure count for task and
bubble forward the exception
+ taskStatusUpdater.setLastTaskRunStatus(-1);
Review Comment:
Add an error log here with exception object `e`
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -571,17 +566,21 @@ public boolean syncDataFromOM() {
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
+ reconTaskUpdater =
reconTaskUpdater.getInstanceWithTask(OmSnapshotTaskName.OmDeltaRequest.name());
+ reconTaskUpdater.setLastUpdatedSeqNumber(currentSequenceNumber);
+ reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
try (OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(omMetadataManager)) {
LOG.info("Obtaining delta updates from Ozone Manager");
// Get updates from OM and apply to local Recon OM DB.
+ reconTaskUpdater.setIsCurrentTaskRunning(1);
+
reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
Review Comment:
Also no need to call this multiple times so frequently. Hardly any visible
difference between line `571` and `577`.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskStatusUpdater.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import org.apache.hadoop.ozone.recon.metrics.ReconTaskStatusCounter;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides utilities to update/modify Recon Task related data
+ * like updating table, incrementing counter etc.
+ */
+public class ReconTaskStatusUpdater {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReconTaskStatusUpdater.class);
+
+ private ReconTaskStatus reconTaskStatus;
+
+ private ReconTaskStatusDao reconTaskStatusDao;
+ private ReconTaskStatusCounter taskStatusCounter;
+
+ private String taskName;
+
+ public ReconTaskStatusUpdater(ReconTaskStatusDao reconTaskStatusDao,
+ ReconTaskStatusCounter reconTaskStatusCounter)
{
+ this.reconTaskStatusDao = reconTaskStatusDao;
+ this.taskStatusCounter = reconTaskStatusCounter;
+ this.reconTaskStatus = new ReconTaskStatus(null, 0L, 0L, 0, 0);
+ }
+
+ public ReconTaskStatusUpdater(ReconTaskStatusDao reconTaskStatusDao,
+ ReconTaskStatusCounter taskStatusCounter,
+ String taskName) {
+ this.taskName = taskName;
+ this.reconTaskStatusDao = reconTaskStatusDao;
+ this.taskStatusCounter = taskStatusCounter;
+ this.reconTaskStatus = new ReconTaskStatus(taskName, 0L, 0L, 0, 0);
+ }
+
+ public void setTaskName(String taskName) {
+ this.taskName = taskName;
+ this.reconTaskStatus.setTaskName(taskName);
+ }
+
+ public void setLastUpdatedSeqNumber(long lastUpdatedSeqNumber) {
+ this.reconTaskStatus.setLastUpdatedSeqNumber(lastUpdatedSeqNumber);
+ }
+
+ public void setLastUpdatedTimestamp(long lastUpdatedTimestamp) {
+ this.reconTaskStatus.setLastUpdatedTimestamp(lastUpdatedTimestamp);
+ }
+
+ public void setLastTaskRunStatus(int lastTaskRunStatus) {
+ this.reconTaskStatus.setLastTaskRunStatus(lastTaskRunStatus);
+ }
+
+ public void setIsCurrentTaskRunning(int isCurrentTaskRunning) {
+ this.reconTaskStatus.setIsCurrentTaskRunning(isCurrentTaskRunning);
+ }
+
+ public ReconTaskStatusUpdater getInstanceWithTask(String name) {
+ return new ReconTaskStatusUpdater(reconTaskStatusDao, taskStatusCounter,
name);
+ }
+
+ public void updateDetails() {
+ if (!reconTaskStatusDao.existsById(this.taskName)) {
+ // First time getting the task, so insert value
+ reconTaskStatusDao.insert(this.reconTaskStatus);
+ LOG.info("Registered Task: {}", this.taskName);
+ } else {
+ // We already have row for the task in the table, update the row
+ reconTaskStatusDao.update(this.reconTaskStatus);
+ if (null != this.reconTaskStatus.getLastTaskRunStatus()) {
+ taskStatusCounter.updateCounter(taskName,
this.reconTaskStatus.getLastTaskRunStatus() > -1);
Review Comment:
Will this not create Race Condition if same `taskStatusCounter` object is
shared among all OM task threads and SCM task threads ?, as I can see its
injected reference and shared among all OM and SCM task threads.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -571,17 +566,21 @@ public boolean syncDataFromOM() {
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
+ reconTaskUpdater =
reconTaskUpdater.getInstanceWithTask(OmSnapshotTaskName.OmDeltaRequest.name());
Review Comment:
Why are we creating new object everytime with every run ? This is quite a
memory overhead, lot of objects will be left for GC. Optimize this.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -590,26 +589,31 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
+ reconTaskUpdater.setLastTaskRunStatus(-1);
LOG.warn("Unable to get and apply delta updates from OM.",
e.getMessage());
fullSnapshot = true;
+ } finally {
Review Comment:
This might not be relevant here because `OMDeltaRequest` was completed at
line 579 only, we don't want to update after consuming the OM events. Consuming
OM events is different flow for other background OM tasks.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -83,11 +89,12 @@ public void registerTask(ReconOmTask task) {
reconOmTasks.put(taskName, task);
// Store Task in Task failure tracker.
taskFailureCounter.put(taskName, new AtomicInteger(0));
- // Create DB record for the task.
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
- 0L, 0L);
- if (!reconTaskStatusDao.existsById(taskName)) {
- reconTaskStatusDao.insert(reconTaskStatusRecord);
+ try {
+ // Create DB record for the task.
+ taskStatusMap.put(taskName, new
ReconTaskStatusUpdater(reconTaskStatusDao, reconTaskStatusCounter, taskName));
+ getUpdaterForTask(taskName).updateDetails();
Review Comment:
Again here, my same question, when already existing data in existing
ReconTaskStatus Table, are we not overwriting the existing values here ? Don't
rely on old code, old code also may be wrong. Kindly explain your understanding.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java:
##########
@@ -84,17 +88,25 @@ public void run() {
}
public void triggerPipelineSyncTask()
- throws IOException, TimeoutException, NodeNotFoundException {
+ throws IOException, NodeNotFoundException {
lock.writeLock().lock();
try {
long start = Time.monotonicNow();
List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
Review Comment:
We are repeatedly calling same set of 3 statements in each SCM task, can we
refactor in common code to avoid code duplication.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -590,26 +589,31 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
+ reconTaskUpdater.setLastTaskRunStatus(-1);
LOG.warn("Unable to get and apply delta updates from OM.",
e.getMessage());
fullSnapshot = true;
+ } finally {
+ reconTaskUpdater.setIsCurrentTaskRunning(0);
+
reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskUpdater.updateDetails();
}
}
if (fullSnapshot) {
+ reconTaskUpdater =
reconTaskUpdater.getInstanceWithTask(OmSnapshotTaskName.OmSnapshotRequest.name());
Review Comment:
Same here, we should not create new instance of updater every time, though
full snapshot doesn't run in every run, but there are some cases where full
snapshot runs.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java:
##########
@@ -185,7 +192,9 @@ public void process(List<ContainerInfo> containers) {
// For New Container being created
try {
process(container, containerSizeCountMap);
+ taskStatusUpdater.setLastTaskRunStatus(0);
Review Comment:
No, we should not update the `lastTaskRunStatus` based on per container
processing. Either all containers should be processed , or any single container
processing fail should be marked as whole task run failure.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -628,10 +633,15 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumSnapshotRequestsFailed();
+ reconTaskUpdater.setLastTaskRunStatus(-1);
LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
+ } finally {
Review Comment:
Same as above. This might not be relevant here because `OMSnapshotRequest`
was completed at line `612` only, we don't want to consider the execution of
reinitialization of tasks flow, as that is different flow for other background
tasks.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -83,11 +89,12 @@ public void registerTask(ReconOmTask task) {
reconOmTasks.put(taskName, task);
// Store Task in Task failure tracker.
taskFailureCounter.put(taskName, new AtomicInteger(0));
- // Create DB record for the task.
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
- 0L, 0L);
- if (!reconTaskStatusDao.existsById(taskName)) {
- reconTaskStatusDao.insert(reconTaskStatusRecord);
+ try {
+ // Create DB record for the task.
+ taskStatusMap.put(taskName, new
ReconTaskStatusUpdater(reconTaskStatusDao, reconTaskStatusCounter, taskName));
+ getUpdaterForTask(taskName).updateDetails();
+ } catch (Exception e) {
Review Comment:
We should not change the behaviour of existing flow. This catch will impact
the robustness of recon startup. Any failure will silently log and even with
some tasks not registered, Recon will be up and running which we don't want.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -107,9 +114,14 @@ public synchronized void
consumeOMEvents(OMUpdateEventBatch events,
try {
if (!events.isEmpty()) {
Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
+ ReconTaskStatus reconTaskStatus;
Review Comment:
This variable not being used any where.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -168,55 +180,44 @@ private void ignoreFailedTasks(List<String> failedTasks) {
@Override
public synchronized void reInitializeTasks(
ReconOMMetadataManager omMetadataManager) throws InterruptedException {
- try {
- Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
- for (Map.Entry<String, ReconOmTask> taskEntry :
- reconOmTasks.entrySet()) {
- ReconOmTask task = taskEntry.getValue();
- tasks.add(() -> task.reprocess(omMetadataManager));
- }
- List<Future<Pair<String, Boolean>>> results =
- executorService.invokeAll(tasks);
- for (Future<Pair<String, Boolean>> f : results) {
+ Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
+ for (Map.Entry<String, ReconOmTask> taskEntry :
+ reconOmTasks.entrySet()) {
+ ReconOmTask task = taskEntry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
getUpdaterForTask(task.getTaskName());
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
+ tasks.add(() -> task.reprocess(omMetadataManager));
+ }
+ List<Future<Pair<String, Boolean>>> results =
+ executorService.invokeAll(tasks);
+ for (Future<Pair<String, Boolean>> f : results) {
+ try {
String taskName = f.get().getLeft();
+ ReconTaskStatusUpdater taskStatusUpdater = getUpdaterForTask(taskName);
+
taskStatusUpdater.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
if (!f.get().getRight()) {
LOG.info("Init failed for task {}.", taskName);
+ taskStatusUpdater.setLastTaskRunStatus(-1);
Review Comment:
Pls change above `LOG` statement to `error`.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconTaskStatusTableUpgradeAction.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.exception.DataAccessException;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Upgrade action for TASK_STATUS_STATISTICS feature layout change, which adds
+ * <code>last_task_run_status</code> and <code>current_task_run_status</code>
columns to
+ * {@link org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition} in case it
is missing .
+ */
+@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_STATISTICS,
+ type = ReconUpgradeAction.UpgradeActionType.FINALIZE)
+public class ReconTaskStatusTableUpgradeAction implements ReconUpgradeAction {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(ReconTaskStatusTableUpgradeAction.class);
+
+ @Override
+ public void execute(ReconStorageContainerManagerFacade scmFacade) throws
DataAccessException {
+ DataSource dataSource = scmFacade.getDataSource();
+ try (Connection conn = dataSource.getConnection()) {
+ if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) {
+ return;
+ }
+ DSLContext dslContext = DSL.using(conn);
+
+ // This is a workaround as currently the upgrade action runs even for a
fresh install
+ // TODO: Remove the check once HDDS-11846 is fixed
+ if (!COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"last_task_run_status")
+ && !COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"current_task_run_status")) {
+ // Add the new columns if it is not already present in the table
+ dslContext.alterTable(RECON_TASK_STATUS_TABLE_NAME)
+ .add(
+ DSL.field(DSL.name("last_task_run_status"),
SQLDataType.INTEGER),
+ DSL.field(DSL.name("current_task_run_status"),
SQLDataType.INTEGER)
+ )
+ .execute();
+ }
+ } catch (SQLException se) {
+ LOG.error("Error while upgrading Recon Task Status table. Message: {}",
se.getMessage());
Review Comment:
Exception should be thrown here with `se` object wrapped in `SQLException`.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -168,55 +180,44 @@ private void ignoreFailedTasks(List<String> failedTasks) {
@Override
public synchronized void reInitializeTasks(
ReconOMMetadataManager omMetadataManager) throws InterruptedException {
- try {
- Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
- for (Map.Entry<String, ReconOmTask> taskEntry :
- reconOmTasks.entrySet()) {
- ReconOmTask task = taskEntry.getValue();
- tasks.add(() -> task.reprocess(omMetadataManager));
- }
- List<Future<Pair<String, Boolean>>> results =
- executorService.invokeAll(tasks);
- for (Future<Pair<String, Boolean>> f : results) {
+ Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
+ for (Map.Entry<String, ReconOmTask> taskEntry :
+ reconOmTasks.entrySet()) {
+ ReconOmTask task = taskEntry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
getUpdaterForTask(task.getTaskName());
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
+ tasks.add(() -> task.reprocess(omMetadataManager));
+ }
+ List<Future<Pair<String, Boolean>>> results =
+ executorService.invokeAll(tasks);
+ for (Future<Pair<String, Boolean>> f : results) {
+ try {
String taskName = f.get().getLeft();
+ ReconTaskStatusUpdater taskStatusUpdater = getUpdaterForTask(taskName);
+
taskStatusUpdater.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
Review Comment:
This is wrong. We should not update the last updated sequence number until
we are sure of task run success, else we need to keep same sequence number when
it was last success. Pls check in other places also if you have done this same
mistake anywhere.
##########
hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/SqlDbUtils.java:
##########
@@ -95,4 +96,20 @@ public void write(int b) throws IOException {
LOG.info("{} table already exists, skipping creation.", tableName);
return true;
};
+
+ /**
+ * Helper function to check if a column exists through JOOQ.
+ */
+ public static final TriFunction<Connection, String, String, Boolean>
COLUMN_EXISTS_CHECK =
Review Comment:
This function should be updated to check for multiple columns, so that on DB
call is sufficient.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -168,55 +180,44 @@ private void ignoreFailedTasks(List<String> failedTasks) {
@Override
public synchronized void reInitializeTasks(
ReconOMMetadataManager omMetadataManager) throws InterruptedException {
- try {
- Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
- for (Map.Entry<String, ReconOmTask> taskEntry :
- reconOmTasks.entrySet()) {
- ReconOmTask task = taskEntry.getValue();
- tasks.add(() -> task.reprocess(omMetadataManager));
- }
- List<Future<Pair<String, Boolean>>> results =
- executorService.invokeAll(tasks);
- for (Future<Pair<String, Boolean>> f : results) {
+ Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
+ for (Map.Entry<String, ReconOmTask> taskEntry :
+ reconOmTasks.entrySet()) {
+ ReconOmTask task = taskEntry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
getUpdaterForTask(task.getTaskName());
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
+ tasks.add(() -> task.reprocess(omMetadataManager));
Review Comment:
`reInitializeTasks` is not the only method from where `reprocess` of all OM
tasks gets called. Pls see complete hierarchy to handle all flows.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -107,9 +114,14 @@ public synchronized void
consumeOMEvents(OMUpdateEventBatch events,
try {
if (!events.isEmpty()) {
Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
+ ReconTaskStatus reconTaskStatus;
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
getUpdaterForTask(task.getTaskName());
+ taskStatusUpdater.setIsCurrentTaskRunning(1);
+
taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
+ taskStatusUpdater.updateDetails();
Review Comment:
Also pls update existing info logs in each task `process` execution to log
the duration or time taken by each task. We already have log with task name
that task is executed, so just need to update the existing log with duration.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -248,14 +249,31 @@ private List<String>
processTaskResults(List<Future<Pair<String, Boolean>>>
List<String> failedTasks = new ArrayList<>();
for (Future<Pair<String, Boolean>> f : results) {
String taskName = f.get().getLeft();
+ ReconTaskStatusUpdater taskStatusUpdater = getUpdaterForTask(taskName);
+
taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
+ taskStatusUpdater.setLastUpdatedTimestamp(System.currentTimeMillis());
Review Comment:
The mechanism of updating the `lastUpdatedTimeStamp` for all tasks is
completely flawed. When the tasks are being submitted to executor service, we
are not sure in which order they will be picked up and executed by threads. So,
it is possible that calling `Future.get()` in sequence (one after another) can
cause blocking behavior where one slow-running task can delay subsequent calls
to `Future.get()` for other tasks, even if those tasks have already completed.
This happens because `Future.get()` is a blocking call. In this case we may
never get the correct idea for any task completion and their
`lastUpdatedTimeStamp`. The idea behind `lastUpdatedTimeStamp` metric/stat is
to understand which task taking how much time in each run cycle and this can be
easily found using prometheus/grafana for the given task metrics we are
updating here. Old code may also be flawed to update `lastUpdatedTimeStamp`,
but we never relied on it and that is the reason this PR to correctly update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]