KarmaGYZ commented on a change in pull request #18087:
URL: https://github.com/apache/flink/pull/18087#discussion_r771172130



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Use reflection to determine whether the Hadoop supports node-label, 
depending on the Hadoop
+ * version, may or may not be supported. If not, nothing happened.
+ *
+ * <p>The node label mechanism is supported by Hadoop version greater than 
2.6.0
+ */
+class ContainerRequestReflector {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ContainerRequestReflector.class);
+
+    static final ContainerRequestReflector INSTANCE = new 
ContainerRequestReflector();
+
+    @Nullable private Constructor<? extends AMRMClient.ContainerRequest> 
defaultConstructor;
+
+    private ContainerRequestReflector() {
+        this(AMRMClient.ContainerRequest.class);
+    }
+
+    @VisibleForTesting
+    ContainerRequestReflector(Class<? extends AMRMClient.ContainerRequest> 
containerRequestClass) {
+        Class<? extends AMRMClient.ContainerRequest> requestCls = 
containerRequestClass;
+        try {
+            /**
+             * To support node-label, using the below constructor. Please 
refer to
+             * 
https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java#L287
+             *
+             * <p>Instantiates a {@link ContainerRequest} with the given 
constraints.
+             *
+             * @param capability The {@link Resource} to be requested for each 
container.
+             * @param nodes Any hosts to request that the containers are 
placed on.
+             * @param racks Any racks to request that the containers are 
placed on. The racks
+             *     corresponding to any hosts requested will be automatically 
added to this list.
+             * @param priority The priority at which to request the 
containers. Higher priorities
+             *     have lower numerical values.
+             * @param allocationRequestId The allocationRequestId of the 
request. To be used as a
+             *     tracking id to match Containers allocated against this 
request. Will default to 0
+             *     if not specified.
+             * @param relaxLocality If true, containers for this request may 
be assigned on hosts
+             *     and racks other than the ones explicitly requested.
+             * @param nodeLabelsExpression Set node labels to allocate 
resource, now we only support
+             *     asking for only a single node label
+             */
+            defaultConstructor =
+                    requestCls.getDeclaredConstructor(
+                            Resource.class,
+                            String[].class,
+                            String[].class,
+                            Priority.class,
+                            boolean.class,
+                            String.class);
+        } catch (NoSuchMethodException exception) {
+            LOG.info("The node-label mechanism of Yarn don't be supported in 
this Hadoop version.");

Review comment:
       ```suggestion
               LOG.debug("The node-label mechanism of Yarn don't be supported 
in this Hadoop version.");
   ```

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof 
ContainerRequestWithConstructor);
+        ContainerRequestWithConstructor containerRequestWithConstructor =
+                (ContainerRequestWithConstructor) containerRequest;
+        assertEquals("GPU", 
containerRequestWithConstructor.getNodeLabelsExpression());
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);

Review comment:
       Assert that `containerRequestWithConstructor.getNodeLabelsExpression()` 
is null would be good enough.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof 
ContainerRequestWithConstructor);
+        ContainerRequestWithConstructor containerRequestWithConstructor =
+                (ContainerRequestWithConstructor) containerRequest;
+        assertEquals("GPU", 
containerRequestWithConstructor.getNodeLabelsExpression());
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, "");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);

Review comment:
       Assert that `containerRequestWithConstructor.getNodeLabelsExpression()` 
is null would be good enough.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
##########
@@ -296,6 +296,14 @@
                     .noDefaultValue()
                     .withDescription("Specify YARN node label for the YARN 
application.");
 
+    public static final ConfigOption<String> TASK_MANAGER_NODE_LABEL =
+            key("yarn.taskmanager.node-label")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify YARN node label for the Flink 
TaskManagers, "
+                                    + "it will override the 
yarn.application.node-label if both are set.");

Review comment:
       nit: From my understanding, it will only override the node label for 
TMs. It would be good to clarify it.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof 
ContainerRequestWithConstructor);
+        ContainerRequestWithConstructor containerRequestWithConstructor =
+                (ContainerRequestWithConstructor) containerRequest;
+        assertEquals("GPU", 
containerRequestWithConstructor.getNodeLabelsExpression());
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, "");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+    }
+
+    @Test
+    public void testGetContainerRequestIfConstructorAbsent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithoutConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, "");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+    }
+
+    @Test
+    public void testGetContainerRequestWithoutYarnSupport() {
+        assumeTrue(HadoopUtils.isMaxHadoopVersion(2, 6));
+
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                
ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, 
"GPU");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+
+        containerRequest =
+                
ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, 
null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+
+        containerRequest =
+                
ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+    }
+
+    @Test
+    public void testGetContainerRequestWithYarnSupport()
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        assumeTrue(HadoopUtils.isMinHadoopVersion(2, 6));
+
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                
ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, 
"GPU");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertEquals("GPU", 
getNodeLabelExpressionWithReflector(containerRequest));
+
+        containerRequest =
+                
ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, 
null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertNull(getNodeLabelExpressionWithReflector(containerRequest));
+
+        containerRequest =
+                
ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertNull(getNodeLabelExpressionWithReflector(containerRequest));
+    }
+
+    private String 
getNodeLabelExpressionWithReflector(AMRMClient.ContainerRequest 
containerRequest)
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        Method method = 
containerRequest.getClass().getMethod("getNodeLabelExpression");
+        return (String) method.invoke(containerRequest);
+    }
+
+    /** Class which does not have required constructor. */
+    private static class ContainerRequestWithoutConstructor extends 
AMRMClient.ContainerRequest {
+
+        public ContainerRequestWithoutConstructor(
+                Resource capability, String[] nodes, String[] racks, Priority 
priority) {
+            super(capability, nodes, racks, priority);
+        }
+    }
+
+    /**
+     * Class which has constructor with the same signature as {@link
+     * org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest} in 
Hadoop 2.6+.
+     */
+    private static class ContainerRequestWithConstructor extends 
AMRMClient.ContainerRequest {
+        private boolean relaxLocality;

Review comment:
       nit: Just remove this field at the moment. Once we need it, we can add 
this back.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof 
ContainerRequestWithConstructor);

Review comment:
       I think we might not need to verify it. Same as below.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof 
ContainerRequestWithConstructor);
+        ContainerRequestWithConstructor containerRequestWithConstructor =
+                (ContainerRequestWithConstructor) containerRequest;
+        assertEquals("GPU", 
containerRequestWithConstructor.getNodeLabelsExpression());
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, "");
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);

Review comment:
       Same as below.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new 
ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, 
priority, "GPU");
+        assertTrue(containerRequest instanceof 
ContainerRequestWithConstructor);
+        ContainerRequestWithConstructor containerRequestWithConstructor =
+                (ContainerRequestWithConstructor) containerRequest;
+        assertEquals("GPU", 
containerRequestWithConstructor.getNodeLabelsExpression());
+
+        containerRequest = 
containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertTrue(containerRequest instanceof AMRMClient.ContainerRequest);
+        assertFalse(containerRequest instanceof 
ContainerRequestWithConstructor);

Review comment:
       Same as below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to