devmadhuu commented on code in PR #7517:
URL: https://github.com/apache/ozone/pull/7517#discussion_r1869510680
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java:
##########
@@ -157,7 +157,6 @@ public Void call() throws Exception {
this.reconTaskStatusMetrics =
injector.getInstance(ReconTaskStatusMetrics.class);
-
Review Comment:
This change is un-necessary.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLastTaskStatusUpgradeAction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Handles the upgrade for {@link
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition}
+ * in case of missing <code>last_task_successful</code> and
<code>current_task_run_status</code> columns.
+ */
+@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_COLUMN_ADDITION,
+ type = ReconUpgradeAction.UpgradeActionType.FINALIZE)
+public class ReconLastTaskStatusUpgradeAction implements ReconUpgradeAction {
Review Comment:
Can we give more meaningful class name which compose the underlying action
because here looks like it is related to LastTaskStatus, but we are adding
currentTaskStatus also.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLastTaskStatusUpgradeAction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Handles the upgrade for {@link
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition}
+ * in case of missing <code>last_task_successful</code> and
<code>current_task_run_status</code> columns.
Review Comment:
Correct the comments.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -225,25 +229,30 @@ public OzoneManagerServiceProviderImpl(
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix +
"SyncOM-%d")
.build();
this.reconContext = reconContext;
+ this.deltaUpdateTaskStatus = new ReconTaskStatus(
+ OmSnapshotTaskName.OmDeltaRequest.name(),
+ 0L, 0L, 0, 0);
+ this.snapshotUpdateTaskStatus = new ReconTaskStatus(
+ OmSnapshotTaskName.OmSnapshotRequest.name(),
+ 0L, 0L, 0, 0);
Review Comment:
`lastUpdatedTimestamp` and `lastUpdatedSeqNumber` for `OmDeltaRequest` and
`OmSnapshotRequest` will always be there if cluster already running Recon. Are
we not overriding those previous version values here ? What is the test written
for testing this ?
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLastTaskStatusUpgradeAction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Handles the upgrade for {@link
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition}
+ * in case of missing <code>last_task_successful</code> and
<code>current_task_run_status</code> columns.
+ */
+@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_COLUMN_ADDITION,
+ type = ReconUpgradeAction.UpgradeActionType.FINALIZE)
+public class ReconLastTaskStatusUpgradeAction implements ReconUpgradeAction {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(ReconLastTaskStatusUpgradeAction.class);
+
+ @Override
+ public void execute(ReconStorageContainerManagerFacade scmFacade) throws
SQLException {
+ DataSource dataSource = scmFacade.getDataSource();
+ try (Connection conn = dataSource.getConnection()) {
+ if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) {
+ return;
+ }
+ DSLContext dslContext = DSL.using(conn);
+
+ if (!COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"last_task_run_status")
+ && !COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"current_task_run_status")) {
+ // Add the new columns if it is not already present in the table
+ dslContext.alterTable(RECON_TASK_STATUS_TABLE_NAME)
+ .add(
+ DSL.field(DSL.name("last_task_successful"),
SQLDataType.INTEGER),
Review Comment:
This column needs to be corrected.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java:
##########
@@ -88,8 +94,12 @@ public boolean isRunning() {
}
protected void recordSingleRunCompletion() {
- reconTaskStatusDao.update(new ReconTaskStatus(getTaskName(),
- System.currentTimeMillis(), 0L));
+ taskStatusCounter.updateCounter(getTaskName(), true);
+ synchronized (this) {
+
reconTaskStatusRecord.setLastUpdatedTimestamp(System.currentTimeMillis());
+ reconTaskStatusRecord.setLastTaskRunStatus(1);
Review Comment:
I think this is incorrect. We should follow the convention if Non-Zero exit
code, then it should be marked as failure, if exit code zero, then task run
should be marked as success. So we can follow a similar convention. Also
non-zero status code value should be -1
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -225,25 +229,30 @@ public OzoneManagerServiceProviderImpl(
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix +
"SyncOM-%d")
.build();
this.reconContext = reconContext;
+ this.deltaUpdateTaskStatus = new ReconTaskStatus(
+ OmSnapshotTaskName.OmDeltaRequest.name(),
+ 0L, 0L, 0, 0);
+ this.snapshotUpdateTaskStatus = new ReconTaskStatus(
+ OmSnapshotTaskName.OmSnapshotRequest.name(),
+ 0L, 0L, 0, 0);
+ this.taskStatusCounter = ReconTaskStatusCounter.getCurrentInstance();
}
public void registerOMDBTasks() {
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
- OmSnapshotTaskName.OmDeltaRequest.name(),
- System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
+
deltaUpdateTaskStatus.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ deltaUpdateTaskStatus.setLastUpdatedTimestamp(System.currentTimeMillis());
Review Comment:
We can create just one variable like previous approach. No need to create
two separate variables for delta and full snapshot.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutFeature.java:
##########
@@ -31,7 +31,8 @@
*/
public enum ReconLayoutFeature {
// Represents the starting point for Recon's layout versioning system.
- INITIAL_VERSION(0, "Recon Layout Versioning Introduction");
+ INITIAL_VERSION(0, "Recon Layout Versioning Introduction"),
+ TASK_STATUS_COLUMN_ADDITION(1, "Adds a new column to the
RECON_TASK_STATUS_TABLE to store last task status");
Review Comment:
Try to give a meaningful name. Pls refer my previous comment.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -575,13 +584,15 @@ public boolean syncDataFromOM() {
new OMDBUpdatesHandler(omMetadataManager)) {
LOG.info("Obtaining delta updates from Ozone Manager");
// Get updates from OM and apply to local Recon OM DB.
+ deltaUpdateTaskStatus.setCurrentTaskRunStatus(1);
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
omdbUpdatesHandler);
// Update timestamp of successful delta updates query.
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
- OmSnapshotTaskName.OmDeltaRequest.name(),
- System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
- reconTaskStatusDao.update(reconTaskStatusRecord);
+
deltaUpdateTaskStatus.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ deltaUpdateTaskStatus.setCurrentTaskRunStatus(0);
+ deltaUpdateTaskStatus.setLastTaskRunStatus(1);
Review Comment:
This status code should be zero as mentioned in above comment.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -601,15 +617,15 @@ public boolean syncDataFromOM() {
metrics.incrNumSnapshotRequests();
LOG.info("Obtaining full snapshot from Ozone Manager");
// Update local Recon OM DB to new snapshot.
+ snapshotUpdateTaskStatus.setCurrentTaskRunStatus(1);
Review Comment:
Handle similar comments for Om full snapshot also as given above.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -590,9 +601,14 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
+ deltaUpdateTaskStatus.setLastTaskRunStatus(-1);
Review Comment:
We should also update `currentTaskRunStatus` here, as the task has thrown
exception and it has stopped.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -590,9 +601,14 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
+ deltaUpdateTaskStatus.setLastTaskRunStatus(-1);
+
deltaUpdateTaskStatus.setLastUpdatedTimestamp(System.currentTimeMillis());
Review Comment:
This value is common in both success and failure case, so can be updated in
finally ?
Same check for updating `currentTaskRunStatus`
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -69,6 +72,7 @@ public ReconTaskControllerImpl(OzoneConfiguration
configuration,
threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
this.reconTaskStatusDao = reconTaskStatusDao;
+ taskStatusCounter = ReconTaskStatusCounter.getCurrentInstance();
Review Comment:
Inject and manage through guice.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -173,21 +178,27 @@ public synchronized void reInitializeTasks(
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
+ taskStatusMap.get(task.getTaskName()).setCurrentTaskRunStatus(1);
tasks.add(() -> task.reprocess(omMetadataManager));
}
List<Future<Pair<String, Boolean>>> results =
executorService.invokeAll(tasks);
for (Future<Pair<String, Boolean>> f : results) {
String taskName = f.get().getLeft();
+ ReconTaskStatus reconTaskStatusRecord = taskStatusMap.get(taskName);
+
reconTaskStatusRecord.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
+
reconTaskStatusRecord.setLastUpdatedTimestamp(System.currentTimeMillis());
if (!f.get().getRight()) {
LOG.info("Init failed for task {}.", taskName);
+ reconTaskStatusRecord.setLastTaskRunStatus(-1);
+ taskStatusCounter.updateCounter(taskName, false);
} else {
//store the timestamp for the task
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
- System.currentTimeMillis(),
- omMetadataManager.getLastSequenceNumberFromDB());
- reconTaskStatusDao.update(reconTaskStatusRecord);
+ reconTaskStatusRecord.setLastTaskRunStatus(1);
Review Comment:
Correct the status code.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLastTaskStatusUpgradeAction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Handles the upgrade for {@link
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition}
+ * in case of missing <code>last_task_successful</code> and
<code>current_task_run_status</code> columns.
+ */
+@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_COLUMN_ADDITION,
+ type = ReconUpgradeAction.UpgradeActionType.FINALIZE)
+public class ReconLastTaskStatusUpgradeAction implements ReconUpgradeAction {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(ReconLastTaskStatusUpgradeAction.class);
+
+ @Override
+ public void execute(ReconStorageContainerManagerFacade scmFacade) throws
SQLException {
+ DataSource dataSource = scmFacade.getDataSource();
+ try (Connection conn = dataSource.getConnection()) {
+ if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) {
+ return;
+ }
+ DSLContext dslContext = DSL.using(conn);
+
+ if (!COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"last_task_run_status")
+ && !COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"current_task_run_status")) {
+ // Add the new columns if it is not already present in the table
+ dslContext.alterTable(RECON_TASK_STATUS_TABLE_NAME)
+ .add(
+ DSL.field(DSL.name("last_task_successful"),
SQLDataType.INTEGER),
+ DSL.field(DSL.name("current_task_run_status"),
SQLDataType.INTEGER)
+ )
+ .execute();
+ }
+ } catch (SQLException se) {
+ throw new SQLException(
Review Comment:
Is this needed ? I think the thrown one will have that info if any of the
columns add will fail. Can you verify it ? Because you are just catching and
throwing same with your own custom message.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java:
##########
@@ -88,8 +94,12 @@ public boolean isRunning() {
}
protected void recordSingleRunCompletion() {
Review Comment:
Pls check, this method is not being called for `ContainerSizeCountTask`.
Also in existing code, for `ContainerHealthTask` this method is called with
each iteration of loop while processing containers. I think we should record
completion at the end of running the task. IMO, This needs to be corrected.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_STORAGE_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_STORAGE_DURATION_DEFAULT;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_STORAGE_DURATION}
+ * which defaults to 30 minutes.
+ * Each task is mapped to a {@link Pair} of <code>{no. of successful runs, no.
of failed runs}</code>
+ */
+public class ReconTaskStatusCounter {
+
+ // Stores an instance of this class to maintain state across calls
+ private static ReconTaskStatusCounter instance;
+ // Stores the configurable timeout duration i.e. the TTL of the counts
+ private final long timeoutDuration;
+
+ /**
+ * {@link Enum} to store the various tasks that are run in Recon.
+ */
+ public enum ReconTasks {
+ ContainerHealthTask,
+ ContainerKeyMapperTask,
+ ContainerSizeCountTask,
+ FileSizeCountTask,
+ NSSummaryTask,
+ OmDeltaRequest,
+ OmTableInsightTask,
+ OmSnapshotRequest,
+ PipelineSyncTask,
+ ReconScmTask
Review Comment:
Is this needed ?
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLastTaskStatusUpgradeAction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.upgrade;
+
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.COLUMN_EXISTS_CHECK;
+import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
+import static
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+
+/**
+ * Handles the upgrade for {@link
org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition}
+ * in case of missing <code>last_task_successful</code> and
<code>current_task_run_status</code> columns.
+ */
+@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_COLUMN_ADDITION,
+ type = ReconUpgradeAction.UpgradeActionType.FINALIZE)
+public class ReconLastTaskStatusUpgradeAction implements ReconUpgradeAction {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(ReconLastTaskStatusUpgradeAction.class);
+
+ @Override
+ public void execute(ReconStorageContainerManagerFacade scmFacade) throws
SQLException {
+ DataSource dataSource = scmFacade.getDataSource();
+ try (Connection conn = dataSource.getConnection()) {
+ if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) {
+ return;
+ }
+ DSLContext dslContext = DSL.using(conn);
+
+ if (!COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"last_task_run_status")
+ && !COLUMN_EXISTS_CHECK.apply(conn, RECON_TASK_STATUS_TABLE_NAME,
"current_task_run_status")) {
Review Comment:
This should be handled by upgrade action framework. Pls fix the existing
upgrade action framework, then this check may not be needed.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -628,10 +647,14 @@ public boolean syncDataFromOM() {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumSnapshotRequestsFailed();
+ snapshotUpdateTaskStatus.setLastTaskRunStatus(-1);
Review Comment:
These all below are repeated set of code. Pls try to refactor and avoid code
duplication.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusCounter.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_STORAGE_DURATION;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_STATUS_STORAGE_DURATION_DEFAULT;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains definitions and implementation of Recon Task Status
counters
+ * For each task we maintain a count of the successes and the failures.
+ * This count is stored for a configurable
+ * {@link
org.apache.hadoop.ozone.recon.ReconServerConfigKeys#OZONE_RECON_TASK_STATUS_STORAGE_DURATION}
+ * which defaults to 30 minutes.
+ * Each task is mapped to a {@link Pair} of <code>{no. of successful runs, no.
of failed runs}</code>
+ */
+public class ReconTaskStatusCounter {
+
+ // Stores an instance of this class to maintain state across calls
+ private static ReconTaskStatusCounter instance;
+ // Stores the configurable timeout duration i.e. the TTL of the counts
+ private final long timeoutDuration;
+
+ /**
+ * {@link Enum} to store the various tasks that are run in Recon.
+ */
+ public enum ReconTasks {
+ ContainerHealthTask,
+ ContainerKeyMapperTask,
+ ContainerSizeCountTask,
+ FileSizeCountTask,
+ NSSummaryTask,
+ OmDeltaRequest,
+ OmTableInsightTask,
+ OmSnapshotRequest,
+ PipelineSyncTask,
+ ReconScmTask
+ }
+
+ private long initializationTime = 0L;
+
+ // Task name is mapped from the enum to a Pair of <count of successful runs,
count of failed runs>
+ private static final Map<ReconTasks, Pair<Integer, Integer>>
TASK_STATUS_COUNTER = new EnumMap<>(ReconTasks.class);
+
+ public ReconTaskStatusCounter() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ timeoutDuration = conf.getTimeDuration(
+ OZONE_RECON_TASK_STATUS_STORAGE_DURATION,
+ OZONE_RECON_TASK_STATUS_STORAGE_DURATION_DEFAULT,
+ TimeUnit.MILLISECONDS
+ );
+
+ initializationTime = System.currentTimeMillis();
+ for (ReconTasks task: ReconTasks.values()) {
+ TASK_STATUS_COUNTER.put(task, Pair.of(0, 0));
+ }
+ }
+
+ /**
+ * Get an instance of <code>this</code> {@link ReconTaskStatusCounter} in
order to persist state
+ * of the task counters between multiple modules/packages.
+ * @return an instance of current {@link ReconTaskStatusCounter}
+ */
+ public static ReconTaskStatusCounter getCurrentInstance() {
Review Comment:
Better manage this singleton using existing guice framework.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -251,9 +266,14 @@ private List<String>
processTaskResults(List<Future<Pair<String, Boolean>>>
if (!f.get().getRight()) {
LOG.info("Failed task : {}", taskName);
failedTasks.add(f.get().getLeft());
+ taskStatusCounter.updateCounter(taskName, false);
+ LOG.info("Task Name: {}, Counts: {}", taskName,
taskStatusCounter.getTaskStatusCounts(taskName));
+ storeLastCompletedTransaction(taskName,
events.getLastSequenceNumber(), 1);
} else {
taskFailureCounter.get(taskName).set(0);
- storeLastCompletedTransaction(taskName,
events.getLastSequenceNumber());
+ taskStatusCounter.updateCounter(taskName, true);
+ LOG.info("Task Name: {}, Counts: {}", taskName,
taskStatusCounter.getTaskStatusCounts(taskName));
+ storeLastCompletedTransaction(taskName,
events.getLastSequenceNumber(), -1);
Review Comment:
This is success condition, so again status code incorrect as -1
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -173,21 +178,27 @@ public synchronized void reInitializeTasks(
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
+ taskStatusMap.get(task.getTaskName()).setCurrentTaskRunStatus(1);
tasks.add(() -> task.reprocess(omMetadataManager));
}
List<Future<Pair<String, Boolean>>> results =
executorService.invokeAll(tasks);
for (Future<Pair<String, Boolean>> f : results) {
String taskName = f.get().getLeft();
+ ReconTaskStatus reconTaskStatusRecord = taskStatusMap.get(taskName);
+
reconTaskStatusRecord.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
+
reconTaskStatusRecord.setLastUpdatedTimestamp(System.currentTimeMillis());
if (!f.get().getRight()) {
LOG.info("Init failed for task {}.", taskName);
+ reconTaskStatusRecord.setLastTaskRunStatus(-1);
+ taskStatusCounter.updateCounter(taskName, false);
} else {
//store the timestamp for the task
- ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
- System.currentTimeMillis(),
- omMetadataManager.getLastSequenceNumberFromDB());
- reconTaskStatusDao.update(reconTaskStatusRecord);
+ reconTaskStatusRecord.setLastTaskRunStatus(1);
+ taskStatusCounter.updateCounter(taskName, true);
}
+ reconTaskStatusRecord.setCurrentTaskRunStatus(0);
+ reconTaskStatusDao.update(reconTaskStatusRecord);
}
} catch (ExecutionException e) {
LOG.error("Unexpected error : ", e);
Review Comment:
What about failure tracking here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]