szetszwo commented on code in PR #4002:
URL: https://github.com/apache/ozone/pull/4002#discussion_r1035152081
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java:
##########
@@ -95,11 +98,11 @@ int currentRatisThreePipelineCount(DatanodeDetails
datanodeDetails) {
return null;
}
})
- .filter(this::isNonClosedRatisThreePipeline)
+ .filter(PipelinePlacementPolicy::isNonClosedRatisThreePipeline)
.count();
}
- private boolean isNonClosedRatisThreePipeline(Pipeline p) {
+ public static boolean isNonClosedRatisThreePipeline(Pipeline p) {
Review Comment:
We should just add `static` and keep it `private`.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java:
##########
@@ -361,6 +365,7 @@ public List<DatanodeDetails> getResultSet(
return results;
}
+
Review Comment:
Please revert this whitespace change.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java:
##########
@@ -95,11 +98,11 @@ int currentRatisThreePipelineCount(DatanodeDetails
datanodeDetails) {
return null;
}
})
- .filter(this::isNonClosedRatisThreePipeline)
+ .filter(PipelinePlacementPolicy::isNonClosedRatisThreePipeline)
.count();
}
- private boolean isNonClosedRatisThreePipeline(Pipeline p) {
+ public static boolean isNonClosedRatisThreePipeline(Pipeline p) {
return p.getReplicationConfig()
Review Comment:
It should check null since the code in `currentRatisThreePipelineCount` may
pass null.
```java
return p != null && p.getReplicationConfig()
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java:
##########
@@ -77,8 +82,14 @@ public RatisPipelineProvider(NodeManager nodeManager,
this.conf = conf;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;
- this.placementPolicy =
- new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+ try {
+ this.placementPolicy = PipelinePlacementPolicyFactory
+ .getPolicy(conf, nodeManager, stateManager,
+ nodeManager.getClusterNetworkTopologyMap(), true, null);
+ } catch (Exception e) {
+ this.placementPolicy = null;
+ LOG.info("Cannot create pipeline policy for pipeline, {}", e);
Review Comment:
Just throw the exception out. Otherwise, it will generate NPE later.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java:
##########
@@ -59,7 +63,8 @@
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
- private final PipelinePlacementPolicy placementPolicy;
+ //private final PipelinePlacementPolicy placementPolicy;
+ private PlacementPolicy placementPolicy = null;
Review Comment:
Keep it final and remove the commented line above.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY;
+
+/**
+ * Pipeline placement factor for pipeline providers to create placement
instance
+ * based on configuration property.
+ * {@link ScmConfigKeys#OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY}
+ */
+public final class PipelinePlacementPolicyFactory {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PipelinePlacementPolicyFactory.class);
+
+ private static final Class<? extends PlacementPolicy>
+ OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
+ SCMContainerPlacementRackScatter.class;
+
+ private PipelinePlacementPolicyFactory() {
+ }
+
+ public static PlacementPolicy getPolicy(
+ ConfigurationSource conf, final NodeManager nodeManager,
+ final PipelineStateManager stateManager,
+ NetworkTopology clusterMap, final boolean fallback,
+ SCMContainerPlacementMetrics metrics) throws SCMException {
+
+ Class<? extends PlacementPolicy> placementClass =
+ PipelinePlacementPolicy.class;
+ Constructor<? extends PlacementPolicy> constructor;
+
+ try {
+ if (conf.get(OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY)
+ .equals(SCMContainerPlacementRackScatter.class.getCanonicalName())) {
+ placementClass = conf.getClass(ScmConfigKeys
+ .OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
+ PlacementPolicy.class);
+
+ constructor = placementClass.getDeclaredConstructor(NodeManager.class,
+ ConfigurationSource.class, NetworkTopology.class, boolean.class,
+ SCMContainerPlacementMetrics.class);
+ LOG.info("Create pipeline placement policy of type {}",
+ placementClass.getCanonicalName());
+ return (constructor.newInstance(nodeManager, conf, clusterMap,
+ fallback, metrics));
+ }
+ constructor = placementClass.getDeclaredConstructor(NodeManager.class,
+ PipelineStateManager.class, ConfigurationSource.class);
+ LOG.info("Create pipeline placement policy of type {}",
+ placementClass.getCanonicalName());
+ return (constructor.newInstance(nodeManager, stateManager, conf));
Review Comment:
We should just use reflection to create a PlacementPolicy and don't make
SCMContainerPlacementRackScatter a special case.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java:
##########
@@ -163,6 +174,12 @@ public synchronized Pipeline create(RatisReplicationConfig
replicationConfig,
containerSizeBytes);
break;
case THREE:
+ List<DatanodeDetails> excludeDueToEngagement =
+ filterPipelineEngagement();
+ if (excludeDueToEngagement.size() > 0 && excludedNodes.size() == 0) {
+ excludedNodes = excludeDueToEngagement;
+ }
+ excludedNodes.addAll(filterPipelineEngagement());
Review Comment:
Why calling `filterPipelineEngagement()` twice?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]