KarmaGYZ commented on a change in pull request #18087: URL: https://github.com/apache/flink/pull/18087#discussion_r771172130
########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Use reflection to determine whether the Hadoop supports node-label, depending on the Hadoop + * version, may or may not be supported. If not, nothing happened. + * + * <p>The node label mechanism is supported by Hadoop version greater than 2.6.0 + */ +class ContainerRequestReflector { + + private static final Logger LOG = LoggerFactory.getLogger(ContainerRequestReflector.class); + + static final ContainerRequestReflector INSTANCE = new ContainerRequestReflector(); + + @Nullable private Constructor<? extends AMRMClient.ContainerRequest> defaultConstructor; + + private ContainerRequestReflector() { + this(AMRMClient.ContainerRequest.class); + } + + @VisibleForTesting + ContainerRequestReflector(Class<? extends AMRMClient.ContainerRequest> containerRequestClass) { + Class<? extends AMRMClient.ContainerRequest> requestCls = containerRequestClass; + try { + /** + * To support node-label, using the below constructor. Please refer to + * https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java#L287 + * + * <p>Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability The {@link Resource} to be requested for each container. + * @param nodes Any hosts to request that the containers are placed on. + * @param racks Any racks to request that the containers are placed on. The racks + * corresponding to any hosts requested will be automatically added to this list. + * @param priority The priority at which to request the containers. Higher priorities + * have lower numerical values. + * @param allocationRequestId The allocationRequestId of the request. To be used as a + * tracking id to match Containers allocated against this request. Will default to 0 + * if not specified. + * @param relaxLocality If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression Set node labels to allocate resource, now we only support + * asking for only a single node label + */ + defaultConstructor = + requestCls.getDeclaredConstructor( + Resource.class, + String[].class, + String[].class, + Priority.class, + boolean.class, + String.class); + } catch (NoSuchMethodException exception) { + LOG.info("The node-label mechanism of Yarn don't be supported in this Hadoop version."); Review comment: ```suggestion LOG.debug("The node-label mechanism of Yarn don't be supported in this Hadoop version."); ``` ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link ContainerRequestReflector}. */ +public class ContainerRequestReflectorTest extends TestLogger { + + @Test + public void testGetContainerRequestIfConstructorPresent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof ContainerRequestWithConstructor); + ContainerRequestWithConstructor containerRequestWithConstructor = + (ContainerRequestWithConstructor) containerRequest; + assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression()); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); Review comment: Assert that `containerRequestWithConstructor.getNodeLabelsExpression()` is null would be good enough. ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link ContainerRequestReflector}. */ +public class ContainerRequestReflectorTest extends TestLogger { + + @Test + public void testGetContainerRequestIfConstructorPresent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof ContainerRequestWithConstructor); + ContainerRequestWithConstructor containerRequestWithConstructor = + (ContainerRequestWithConstructor) containerRequest; + assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression()); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, ""); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); Review comment: Assert that `containerRequestWithConstructor.getNodeLabelsExpression()` is null would be good enough. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ########## @@ -296,6 +296,14 @@ .noDefaultValue() .withDescription("Specify YARN node label for the YARN application."); + public static final ConfigOption<String> TASK_MANAGER_NODE_LABEL = + key("yarn.taskmanager.node-label") + .stringType() + .noDefaultValue() + .withDescription( + "Specify YARN node label for the Flink TaskManagers, " + + "it will override the yarn.application.node-label if both are set."); Review comment: nit: From my understanding, it will only override the node label for TMs. It would be good to clarify it. ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link ContainerRequestReflector}. */ +public class ContainerRequestReflectorTest extends TestLogger { + + @Test + public void testGetContainerRequestIfConstructorPresent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof ContainerRequestWithConstructor); + ContainerRequestWithConstructor containerRequestWithConstructor = + (ContainerRequestWithConstructor) containerRequest; + assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression()); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, ""); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + } + + @Test + public void testGetContainerRequestIfConstructorAbsent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithoutConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, ""); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + } + + @Test + public void testGetContainerRequestWithoutYarnSupport() { + assumeTrue(HadoopUtils.isMaxHadoopVersion(2, 6)); + + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + + containerRequest = + ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + + containerRequest = + ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, ""); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + } + + @Test + public void testGetContainerRequestWithYarnSupport() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + assumeTrue(HadoopUtils.isMinHadoopVersion(2, 6)); + + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertEquals("GPU", getNodeLabelExpressionWithReflector(containerRequest)); + + containerRequest = + ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertNull(getNodeLabelExpressionWithReflector(containerRequest)); + + containerRequest = + ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, ""); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertNull(getNodeLabelExpressionWithReflector(containerRequest)); + } + + private String getNodeLabelExpressionWithReflector(AMRMClient.ContainerRequest containerRequest) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Method method = containerRequest.getClass().getMethod("getNodeLabelExpression"); + return (String) method.invoke(containerRequest); + } + + /** Class which does not have required constructor. */ + private static class ContainerRequestWithoutConstructor extends AMRMClient.ContainerRequest { + + public ContainerRequestWithoutConstructor( + Resource capability, String[] nodes, String[] racks, Priority priority) { + super(capability, nodes, racks, priority); + } + } + + /** + * Class which has constructor with the same signature as {@link + * org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest} in Hadoop 2.6+. + */ + private static class ContainerRequestWithConstructor extends AMRMClient.ContainerRequest { + private boolean relaxLocality; Review comment: nit: Just remove this field at the moment. Once we need it, we can add this back. ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link ContainerRequestReflector}. */ +public class ContainerRequestReflectorTest extends TestLogger { + + @Test + public void testGetContainerRequestIfConstructorPresent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof ContainerRequestWithConstructor); Review comment: I think we might not need to verify it. Same as below. ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link ContainerRequestReflector}. */ +public class ContainerRequestReflectorTest extends TestLogger { + + @Test + public void testGetContainerRequestIfConstructorPresent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof ContainerRequestWithConstructor); + ContainerRequestWithConstructor containerRequestWithConstructor = + (ContainerRequestWithConstructor) containerRequest; + assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression()); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, ""); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); Review comment: Same as below. ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link ContainerRequestReflector}. */ +public class ContainerRequestReflectorTest extends TestLogger { + + @Test + public void testGetContainerRequestIfConstructorPresent() { + final ContainerRequestReflector containerRequestReflector = + new ContainerRequestReflector(ContainerRequestWithConstructor.class); + Resource resource = Resource.newInstance(100, 1); + Priority priority = Priority.newInstance(1); + + AMRMClient.ContainerRequest containerRequest = + containerRequestReflector.getContainerRequest(resource, priority, "GPU"); + assertTrue(containerRequest instanceof ContainerRequestWithConstructor); + ContainerRequestWithConstructor containerRequestWithConstructor = + (ContainerRequestWithConstructor) containerRequest; + assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression()); + + containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null); + assertTrue(containerRequest instanceof AMRMClient.ContainerRequest); + assertFalse(containerRequest instanceof ContainerRequestWithConstructor); Review comment: Same as below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
