szetszwo commented on code in PR #4683:
URL: https://github.com/apache/ozone/pull/4683#discussion_r1205020883


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java:
##########
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.scm.ha;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicLong;

Review Comment:
   Could you move it right above to `import 
java.util.concurrent.locks.ReentrantReadWriteLock;`?



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java:
##########
@@ -556,11 +556,20 @@ public final class ScmConfigKeys {
           "ozone.scm.ha.ratis.log.purge.gap";
   public static final int OZONE_SCM_HA_RAFT_LOG_PURGE_GAP_DEFAULT = 1000000;
 
+  // the config will transfer value to ratis config
+  // raft.server.snapshot.auto.trigger.threshold

Review Comment:
   Use javadoc comments.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.  
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service running in SCM to check and flush the HA Transaction
+ * buffer.
+ */
+public class SCMHATransactionBufferMonitorTask implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMHATransactionBufferMonitorTask.class);
+  private final SCMRatisServer server;
+  private final SCMHADBTransactionBuffer transactionBuffer;
+  private final long flushInterval;
+
+  /**
+   * SCMService related variables.
+   */
+
+  @SuppressWarnings("parameternumber")

Review Comment:
   Remove `@SuppressWarnings`



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java:
##########
@@ -67,6 +71,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
 
   // this should ideally be started only in a ratis leader
   private final InterSCMGrpcProtocolService grpcServer;
+  private BackgroundSCMService trxBufferMonitorService = null;

Review Comment:
   I see.  We need to have some code refactoring.  Let's do it later.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java:
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service running in SCM to check and flush the HA Transaction
+ * buffer.
+ */
+public class SCMHATransactionBufferMonitorTask implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMHATransactionBufferMonitorTask.class);
+  private SCMRatisServer server;
+  private SCMHADBTransactionBuffer transactionBuffer;
+  private long flushInterval = 0;
+
+  /**
+   * SCMService related variables.
+   */
+
+  @SuppressWarnings("parameternumber")
+  public SCMHATransactionBufferMonitorTask(
+      SCMHADBTransactionBuffer transactionBuffer,
+      SCMRatisServer server, long flushInterval) {
+    this.flushInterval = flushInterval;
+    this.transactionBuffer = transactionBuffer;
+    this.server = server;
+  }
+
+  @Override
+  public void run() {
+    if (transactionBuffer.shouldFlush(flushInterval)) {
+      LOG.debug("Running TransactionFlushTask");
+      // set latest snapshot to null for force snapshot
+      // the value will be reset again when snapshot is taken
+      SnapshotInfo lastSnapshot = transactionBuffer.getLatestSnapshot();
+      transactionBuffer.setLatestSnapshot(null);
+      try {
+        server.doSnapshotRequest();
+      } catch (IOException e) {
+        LOG.error("Snapshot request is failed", e);
+      } finally {
+        // under failure case, if unable to take snapshot, its value
+        // is reset to previous known value
+        if (null == transactionBuffer.getLatestSnapshot()) {
+          transactionBuffer.setLatestSnapshot(lastSnapshot);
+        }

Review Comment:
   It is still not working get and set are two different calls.  We need to use 
`AtomicReference.compareAndSet`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java:
##########
@@ -92,6 +97,25 @@ public SCMHAManagerImpl(final ConfigurationSource conf,
     }
 
   }
+  
+  private void createStartTransactionBufferMonitor() {
+    long interval = conf.getTimeDuration(
+        OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+        OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    SCMHATransactionBufferMonitorTask monitorTask
+        = new SCMHATransactionBufferMonitorTask(
+        (SCMHADBTransactionBuffer) transactionBuffer, ratisServer, interval);

Review Comment:
   I see.   Could you move the method to right below the `start()` method?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java:
##########
@@ -44,6 +45,8 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
   private BatchOperation currentBatchOperation;
   private TransactionInfo latestTrxInfo;
   private SnapshotInfo latestSnapshot;
+  private AtomicLong txFlushPending = new AtomicLong(0);

Review Comment:
   Add `final`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.  
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service running in SCM to check and flush the HA Transaction
+ * buffer.
+ */
+public class SCMHATransactionBufferMonitorTask implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMHATransactionBufferMonitorTask.class);
+  private final SCMRatisServer server;
+  private final SCMHADBTransactionBuffer transactionBuffer;
+  private final long flushInterval;
+
+  /**
+   * SCMService related variables.
+   */
+
+  @SuppressWarnings("parameternumber")
+  public SCMHATransactionBufferMonitorTask(
+      SCMHADBTransactionBuffer transactionBuffer,
+      SCMRatisServer server, long flushInterval) {
+    this.flushInterval = flushInterval;
+    this.transactionBuffer = transactionBuffer;
+    this.server = server;
+  }
+
+  @Override
+  public void run() {
+    if (transactionBuffer.shouldFlush(flushInterval)) {
+      LOG.debug("Running TransactionFlushTask");
+      // set latest snapshot to null for force snapshot
+      // the value will be reset again when snapshot is taken
+      SnapshotInfo lastSnapshot = transactionBuffer.getLatestSnapshot();

Review Comment:
   We should use `AtomicReference.getAndSet(null)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to