sumitagrawl commented on code in PR #7517: URL: https://github.com/apache/ozone/pull/7517#discussion_r1900693302
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ReconTaskStatusResponse.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Class to represent the API response structure of task status statistics. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ReconTaskStatusResponse { Review Comment: We do not need this object, already ReconTaskStatus is there having same members. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java: ########## @@ -185,7 +185,6 @@ public final class ReconServerConfigKeys { public static final int OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3; - Review Comment: revert the change ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdaterManager.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.updater; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class provides caching for ReconTaskStatusUpdater instances. + * For each task we maintain a map of updater instance and provide it to consumers + * to update. + * Here we also make a single call to the TASK_STATUS_TABLE to check if previous values are present + * for a task in the DB to avoid overwrite to initial state + */ +@Singleton +public class ReconTaskStatusUpdaterManager { + private final ReconTaskStatusDao reconTaskStatusDao; + // Act as a cache for the task updater instancesF + private final ConcurrentHashMap<String, ReconTaskStatusUpdater> updaterCache; + + @Inject + public ReconTaskStatusUpdaterManager( + ReconTaskStatusDao reconTaskStatusDao + ) { + this.reconTaskStatusDao = reconTaskStatusDao; + this.updaterCache = new ConcurrentHashMap<>(); + + // Fetch the tasks present in the DB already + List<ReconTaskStatus> tasks = reconTaskStatusDao.findAll(); + for (ReconTaskStatus task: tasks) { + updaterCache.put(task.getTaskName(), + new ReconTaskStatusUpdater(reconTaskStatusDao, task.getTaskName(), + task.getLastUpdatedTimestamp(), task.getLastUpdatedSeqNumber(), + task.getLastTaskRunStatus(), task.getIsCurrentTaskRunning() + )); + } + } + + /** + * Gets the updater for the provided task name and updates DB with initial values + * if the task is not already present in DB. + * @param taskName The name of the task for which we want to get instance of the updater + * @return An instance of {@link ReconTaskStatusUpdater} for the provided task name. + */ + public ReconTaskStatusUpdater getTaskStatusUpdater(String taskName) { + // If the task is not already present in the DB then we can initialize using initial values + return updaterCache.computeIfAbsent(taskName, (name) -> { + ReconTaskStatusUpdater taskStatusUpdater = new ReconTaskStatusUpdater( + reconTaskStatusDao, name); + // Insert initial values into DB + taskStatusUpdater.updateDetails(); Review Comment: save to db called twice on startup if not present. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconTaskStatusTableUpgradeAction.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.upgrade; + +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.jooq.DSLContext; +import org.jooq.exception.DataAccessException; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK; +import static org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME; + + +/** + * Upgrade action for TASK_STATUS_STATISTICS feature layout change, which adds + * <code>last_task_run_status</code> and <code>current_task_run_status</code> columns to + * {@link org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition} in case it is missing . + */ +@UpgradeActionRecon(feature = ReconLayoutFeature.TASK_STATUS_STATISTICS, + type = ReconUpgradeAction.UpgradeActionType.FINALIZE) +public class ReconTaskStatusTableUpgradeAction implements ReconUpgradeAction { + + public static final Logger LOG = LoggerFactory.getLogger(ReconTaskStatusTableUpgradeAction.class); + + /** + * Utility function to add provided column to RECON_TASK_STATUS table as INTEGER type. + * @param dslContext Stores {@link DSLContext} to perform alter operations + * @param columnName Name of the column to be inserted to the table + */ + private void addColumnToTable(DSLContext dslContext, String columnName) { + //Column is set as nullable to avoid any errors. + dslContext.alterTable(RECON_TASK_STATUS_TABLE_NAME) + .addColumn(columnName, SQLDataType.INTEGER.nullable(true)).execute(); + } + + /** + * Utility function to set the provided column as Non-Null to enforce constraints in RECON_TASK_STATUS table. + * @param dslContext Stores {@link DSLContext} to perform alter operations + * @param columnName Name of the column to set as non-null + */ + private void setColumnAsNonNullable(DSLContext dslContext, String columnName) { + dslContext.alterTable(RECON_TASK_STATUS_TABLE_NAME) + .alterColumn(columnName).setNotNull().execute(); + } + + @Override + public void execute(ReconStorageContainerManagerFacade scmFacade) throws DataAccessException { + DataSource dataSource = scmFacade.getDataSource(); + try (Connection conn = dataSource.getConnection()) { + if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) { + return; + } + + DSLContext dslContext = DSL.using(conn); + // JOOQ doesn't support Derby DB officially, there is no way to run 'ADD COLUMN' command in single call + // for multiple columns. Hence, we run it as two separate steps. + LOG.info("Adding 'last_task_run_status' column to task status table"); + addColumnToTable(dslContext, "last_task_run_status"); + LOG.info("Adding 'current_task_run_status' column to task status table"); Review Comment: log should be is_current_task_running as added ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java: ########## @@ -135,6 +134,8 @@ public class OzoneManagerServiceProviderImpl private final String threadNamePrefix; private ThreadFactory threadFactory; private ReconContext reconContext; + private ReconTaskStatusUpdater reconTaskUpdater; Review Comment: no usecase of reconTaskUpdater being member varialble, used locally only in method. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java: ########## @@ -100,5 +122,9 @@ public String getTaskName() { return getClass().getSimpleName(); } + public ReconTaskStatusUpdater getTaskStatusUpdater() { Review Comment: we can have runTask() be abstract instead of run(), and start/end of taskstatus update be part of run() - update start task - call runTask() - finish update task status ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java: ########## @@ -225,28 +227,20 @@ public OzoneManagerServiceProviderImpl( new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "SyncOM-%d") .build(); this.reconContext = reconContext; + this.taskStatusUpdaterManager = taskStatusUpdaterManager; } public void registerOMDBTasks() { - ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus( - OmSnapshotTaskName.OmDeltaRequest.name(), - System.currentTimeMillis(), getCurrentOMDBSequenceNumber()); - if (!reconTaskStatusDao.existsById( - OmSnapshotTaskName.OmDeltaRequest.name())) { - reconTaskStatusDao.insert(reconTaskStatusRecord); - LOG.info("Registered {} task ", - OmSnapshotTaskName.OmDeltaRequest.name()); - } + registerTask(OmSnapshotTaskName.OmSnapshotRequest.name()); + registerTask(OmSnapshotTaskName.OmDeltaRequest.name()); + } - reconTaskStatusRecord = new ReconTaskStatus( - OmSnapshotTaskName.OmSnapshotRequest.name(), - System.currentTimeMillis(), getCurrentOMDBSequenceNumber()); - if (!reconTaskStatusDao.existsById( - OmSnapshotTaskName.OmSnapshotRequest.name())) { - reconTaskStatusDao.insert(reconTaskStatusRecord); - LOG.info("Registered {} task ", - OmSnapshotTaskName.OmSnapshotRequest.name()); - } + private void registerTask(String taskName) { + reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + reconTaskUpdater.setTaskName(taskName); + reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); + reconTaskUpdater.setLastUpdatedTimestamp(System.currentTimeMillis()); Review Comment: When starting a task, how lastUpdatedTimestamp is current time? it should be obtained from DB. I think this logic of getting from DB should be moved inside ReconTaskUpdater itself. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java: ########## @@ -571,17 +565,36 @@ public boolean syncDataFromOM() { if (currentSequenceNumber <= 0) { fullSnapshot = true; } else { + reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater( + OmSnapshotTaskName.OmDeltaRequest.name()); + reconTaskUpdater.setLastUpdatedSeqNumber(currentSequenceNumber); Review Comment: on completion, lastUpdatedSeqNumber is already updated, why need set again ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdaterManager.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.updater; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class provides caching for ReconTaskStatusUpdater instances. + * For each task we maintain a map of updater instance and provide it to consumers + * to update. + * Here we also make a single call to the TASK_STATUS_TABLE to check if previous values are present + * for a task in the DB to avoid overwrite to initial state + */ +@Singleton +public class ReconTaskStatusUpdaterManager { + private final ReconTaskStatusDao reconTaskStatusDao; + // Act as a cache for the task updater instancesF + private final ConcurrentHashMap<String, ReconTaskStatusUpdater> updaterCache; Review Comment: Map<> = new ConcurrentHashMap<>(); -- reference should be base class ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ReconTaskStatusStat.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class provides the model for storing the statistics for the + * various tasks that are run by Recon. + */ +public class ReconTaskStatusStat { Review Comment: this is no more used, remove this and related test code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
