Repository: tez Updated Branches: refs/heads/master 82d73b380 -> 75bc7c157
TEZ-3874. NPE in TezClientUtils when "yarn.resourcemanager.zk-address" is present in Configuration. (Eric Wohlstadter via jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/75bc7c15 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/75bc7c15 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/75bc7c15 Branch: refs/heads/master Commit: 75bc7c157682cc64eccfa4722226a8ac72161f17 Parents: 82d73b3 Author: Jason Lowe <[email protected]> Authored: Fri Mar 9 10:00:21 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Fri Mar 9 10:00:21 2018 -0600 ---------------------------------------------------------------------- .../org/apache/tez/client/TezClientUtils.java | 14 +++++--- .../java/org/apache/tez/common/TezUtils.java | 38 ++++++++++++++------ .../main/java/org/apache/tez/dag/api/DAG.java | 15 ++------ .../org/apache/tez/client/TestTezClient.java | 13 +++++++ .../org/apache/tez/common/TestTezUtils.java | 15 ++++++++ 5 files changed, 68 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/75bc7c15/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index caf610d..689d947 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -807,10 +807,16 @@ public class TezClientUtils { assert amConf != null; ConfigurationProto.Builder builder = ConfigurationProto.newBuilder(); for (Entry<String, String> entry : amConf) { - PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); - kvp.setKey(entry.getKey()); - kvp.setValue(amConf.get(entry.getKey())); - builder.addConfKeyValues(kvp); + String key = entry.getKey(); + String val = amConf.get(key); + if(val != null) { + PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); + kvp.setKey(key); + kvp.setValue(val); + builder.addConfKeyValues(kvp); + } else { + LOG.debug("null value in Configuration after replacement for key={}. Skipping.", key); + } } AMPluginDescriptorProto pluginDescriptorProto = http://git-wip-us.apache.org/repos/asf/tez/blob/75bc7c15/tez-api/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java index dfdf9fa..efd4502 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java @@ -134,16 +134,8 @@ public class TezUtils { private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException { - DAGProtos.ConfigurationProto.Builder confProtoBuilder = DAGProtos.ConfigurationProto - .newBuilder(); - Iterator<Map.Entry<String, String>> iter = conf.iterator(); - while (iter.hasNext()) { - Map.Entry<String, String> entry = iter.next(); - DAGProtos.PlanKeyValuePair.Builder kvp = DAGProtos.PlanKeyValuePair.newBuilder(); - kvp.setKey(entry.getKey()); - kvp.setValue(entry.getValue()); - confProtoBuilder.addConfKeyValues(kvp); - } + DAGProtos.ConfigurationProto.Builder confProtoBuilder = DAGProtos.ConfigurationProto.newBuilder(); + populateConfProtoFromEntries(conf, confProtoBuilder); DAGProtos.ConfigurationProto confProto = confProtoBuilder.build(); confProto.writeTo(dos); } @@ -167,7 +159,13 @@ public class TezUtils { Iterator<Entry<String, String>> iter = conf.iterator(); while (iter.hasNext()) { Entry<String, String> entry = iter.next(); - confJson.put(entry.getKey(), conf.get(entry.getKey())); + String key = entry.getKey(); + String val = conf.get(entry.getKey()); + if(val != null) { + confJson.put(key, val); + } else { + LOG.debug("null value in Configuration after replacement for key={}. Skipping.", key); + } } jsonObject.put(ATSConstants.CONFIG, confJson); } @@ -181,4 +179,22 @@ public class TezUtils { return convertToHistoryText(null, conf); } + + /* Copy each Map.Entry with non-null value to DAGProtos.ConfigurationProto */ + public static void populateConfProtoFromEntries(Iterable<Map.Entry<String, String>> params, + DAGProtos.ConfigurationProto.Builder confBuilder) { + for(Map.Entry<String, String> entry : params) { + String key = entry.getKey(); + String val = entry.getValue(); + if(val != null) { + DAGProtos.PlanKeyValuePair.Builder kvp = DAGProtos.PlanKeyValuePair.newBuilder(); + kvp.setKey(key); + kvp.setValue(val); + confBuilder.addConfKeyValues(kvp); + } else { + LOG.debug("null value for key={}. Skipping.", key); + } + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/75bc7c15/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index cdfa3b2..735c749 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.client.CallerContext; import org.apache.tez.common.JavaOptsChecker; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; @@ -985,12 +986,7 @@ public class DAG { if (vertex.getConf()!= null && vertex.getConf().size() > 0) { ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder(); - for (Map.Entry<String, String> entry : vertex.getConf().entrySet()) { - PlanKeyValuePair.Builder keyValueBuilder = PlanKeyValuePair.newBuilder(); - keyValueBuilder.setKey(entry.getKey()); - keyValueBuilder.setValue(entry.getValue()); - confBuilder.addConfKeyValues(keyValueBuilder); - } + TezUtils.populateConfProtoFromEntries(vertex.getConf().entrySet(), confBuilder); vertexBuilder.setVertexConf(confBuilder); } @@ -1091,12 +1087,7 @@ public class DAG { ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); if (!this.dagConf.isEmpty()) { - for (Entry<String, String> entry : this.dagConf.entrySet()) { - PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); - kvp.setKey(entry.getKey()); - kvp.setValue(entry.getValue()); - confProtoBuilder.addConfKeyValues(kvp); - } + TezUtils.populateConfProtoFromEntries(this.dagConf.entrySet(), confProtoBuilder); } // Copy historyLogLevel from tezConf into dagConf if its not overridden in dagConf. String logLevel = this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); http://git-wip-us.apache.org/repos/asf/tez/blob/75bc7c15/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 0cbef76..2c04061 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -90,6 +90,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRespo import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -927,4 +928,16 @@ public class TestTezClient { Thread.sleep(3 * amHeartBeatTimeoutSecs * 1000); assertTrue(client.getAMKeepAliveService().isTerminated()); } + + //See TEZ-3874 + @Test(timeout = 5000) + public void testYarnZkDeprecatedConf() { + Configuration conf = new Configuration(false); + String val = "hostname:2181"; + conf.set("yarn.resourcemanager.zk-address", val); + + ConfigurationProto confProto = null; + //Test that Exception is not thrown by createFinalConfProtoForApp + TezClientUtils.createFinalConfProtoForApp(conf, null); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/75bc7c15/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java index 61bb9a7..16efc8f 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java @@ -20,14 +20,19 @@ package org.apache.tez.common; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.BitSet; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.records.DAGProtos; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; @@ -230,4 +235,14 @@ public class TestTezUtils { } + @Test(timeout = 5000) + public void testPopulateConfProtoFromEntries() { + Map<String, String> map = new HashMap<>(); + map.put("nonNullKey", "value"); + map.put("nullKey", null); + DAGProtos.ConfigurationProto.Builder confBuilder = DAGProtos.ConfigurationProto.newBuilder(); + TezUtils.populateConfProtoFromEntries(map.entrySet(), confBuilder); + assertEquals(confBuilder.getConfKeyValuesList().size(), 1); + } + }
