kerneltime commented on code in PR #7406:
URL: https://github.com/apache/ozone/pull/7406#discussion_r1868725352


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java:
##########
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
+import org.apache.hadoop.ozone.om.ratis.execution.request.ExecutionContext;
+import org.apache.hadoop.ozone.om.ratis.execution.request.OMRequestExecutor;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ozone manager request executor.
+ * Execute in below stage:
+ * 1. Request is executed (process) and submit to merge pool
+ * 2. request is picked in bulk, db changes are merged to single request
+ * 3. merged single request is submitted to ratis to persist in db
+ */
+public class LeaderRequestExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaderRequestExecutor.class);
+  private final OzoneManager ozoneManager;
+  private final AtomicLong uniqueIndex;
+  private final AtomicBoolean isEnabled = new AtomicBoolean(true);
+  private final OzoneManagerRequestHandler handler;
+  private final int ratisByteLimit;
+  private final int mergeTaskPoolSize;
+  private final AtomicInteger mergeCurrentPool = new AtomicInteger(0);
+  private final PoolExecutor<RequestContext, RatisContext> requestMerger;
+  private final ClientId clientId = ClientId.randomId();
+
+  public LeaderRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
+    this.ozoneManager = om;
+    this.uniqueIndex = uniqueIndex;
+    this.handler = new OzoneManagerRequestHandler(ozoneManager);
+    this.mergeTaskPoolSize = 
om.getConfiguration().getInt("ozone.om.leader.merge.pool.size", 1);
+    int mergeTaskQueueSize = 
om.getConfiguration().getInt("ozone.om.leader.merge.queue.size", 1000);
+    requestMerger = new PoolExecutor<>(mergeTaskPoolSize, mergeTaskQueueSize,
+        ozoneManager.getThreadNamePrefix() + "-LeaderMerger", 
this::requestMergeCommand, this::ratisSubmit);
+    int limit = (int) ozoneManager.getConfiguration().getStorageSize(
+        OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+        OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+        StorageUnit.BYTES);
+    // always go to 90% of max limit for request as other header will be added
+    this.ratisByteLimit = (int) (limit * 0.8);
+  }
+
+  public void start() {
+    isEnabled.set(true);
+    requestMerger.start();
+  }
+  public void stop() {
+    requestMerger.stop();
+  }
+
+  public void setEnabled(boolean enabled) {
+    isEnabled.set(enabled);
+  }
+
+  public boolean isEnabled() {
+    return isEnabled.get();
+  }
+
+  public void submit(RequestContext ctx) throws InterruptedException, 
IOException {
+    if (!isEnabled.get()) {
+      rejectRequest(Collections.singletonList(ctx));
+      return;
+    }
+    executeRequest(ctx, this::mergeSubmit);
+  }
+
+  private void rejectRequest(Collection<RequestContext> ctxs) {
+    Throwable th;
+    if (!ozoneManager.isLeaderReady()) {
+      String peerId = ozoneManager.isRatisEnabled() ? 
ozoneManager.getOmRatisServer().getRaftPeerId().toString()
+          : ozoneManager.getOMNodeId();
+      th = new OMLeaderNotReadyException(peerId + " is not ready to process 
request yet.");
+    } else {
+      th = new OMException("Request processing is disabled due to error", 
OMException.ResultCodes.INTERNAL_ERROR);
+    }
+    handleBatchUpdateComplete(ctxs, th, null);
+  }
+
+  private void executeRequest(RequestContext ctx, 
PoolExecutor.CheckedConsumer<RequestContext> nxtStage) {
+    OMRequest request = ctx.getRequest();
+    ExecutionContext executionContext = new ExecutionContext();
+    executionContext.setIndex(uniqueIndex.incrementAndGet());
+    ctx.setExecutionContext(executionContext);
+    try {
+      handleRequest(ctx, executionContext);
+    } catch (IOException e) {
+      LOG.warn("Failed to write, Exception occurred ", e);
+      ctx.setResponse(createErrorResponse(request, e));
+    } catch (Throwable e) {
+      LOG.warn("Failed to write, Exception occurred ", e);
+      ctx.setResponse(createErrorResponse(request, new IOException(e)));
+    } finally {
+      if 
(!ctx.getRequestExecutor().changeRecorder().getTableRecordsMap().isEmpty()) {
+        // submit to next stage for db changes
+        try {
+          nxtStage.accept(ctx);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      } else {
+        handleBatchUpdateComplete(Collections.singletonList(ctx), null, null);
+      }
+    }
+  }
+
+  private void handleRequest(RequestContext ctx, ExecutionContext exeCtx) 
throws IOException {
+    OMRequestExecutor omClientRequest = ctx.getRequestExecutor();

Review Comment:
   ```suggestion
       OMRequestExecutor omRequestExecutor = ctx.getRequestExecutor();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to