Repository: incubator-apex-core Updated Branches: refs/heads/master ddb7471ed -> 5786866e5
APEXCORE-434 Alternate way of keeping track of filters that are already added to the Jersey client by using a wrapper method since the original underlying method for checking for added filters from Jersey Client throws a ClassCastException APEXCORE-444 With connection pooling the correct stram authentication token corresponding to an application was not always being used for the REST calls. Fixed it to use the correct token. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/95bca45a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/95bca45a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/95bca45a Branch: refs/heads/master Commit: 95bca45ada1816243320d7c89e17b1701bf0a048 Parents: 9b6f65d Author: Pramod Immaneni <pra...@datatorrent.com> Authored: Fri Apr 15 17:53:59 2016 -0700 Committer: Pramod Immaneni <pra...@datatorrent.com> Committed: Mon Apr 25 13:34:49 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/stram/client/StramAgent.java | 13 +++---- .../stram/util/WebServicesClient.java | 24 +++++++++++++ .../stram/util/WebServicesClientTest.java | 38 ++++++++++++++++++++ 3 files changed, 66 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/95bca45a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java index ab26e17..7acd232 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; @@ -198,8 +197,8 @@ public class StramAgent extends FSAgent private UriBuilder getStramWebURIBuilder(WebServicesClient webServicesClient, String appid) throws IncompatibleVersionException { - Client wsClient = webServicesClient.getClient(); - wsClient.setFollowRedirects(true); + webServicesClient.getClient().setFollowRedirects(true); + webServicesClient.clearFilters(); StramWebServicesInfo info = getWebServicesInfo(appid); UriBuilder ub = null; if (info != null) { @@ -209,14 +208,10 @@ public class StramAgent extends FSAgent WebServicesVersionConversion.Converter versionConverter = WebServicesVersionConversion.getConverter(info.version); if (versionConverter != null) { VersionConversionFilter versionConversionFilter = new VersionConversionFilter(versionConverter); - if (!wsClient.isFilterPreset(versionConversionFilter)) { - wsClient.addFilter(versionConversionFilter); - } + webServicesClient.addFilter(versionConversionFilter); } if (info.securityInfo != null) { - if (!wsClient.isFilterPreset(info.securityInfo.secClientFilter)) { - wsClient.addFilter(info.securityInfo.secClientFilter); - } + webServicesClient.addFilter(info.securityInfo.secClientFilter); } } return ub; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/95bca45a/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java b/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java index 025f099..1712080 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java @@ -20,6 +20,8 @@ package com.datatorrent.stram.util; import java.io.IOException; import java.security.Principal; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.Future; import org.slf4j.Logger; @@ -44,6 +46,7 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.async.ITypeListener; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.apache4.ApacheHttpClient4Handler; /** @@ -63,6 +66,8 @@ public class WebServicesClient private final Client client; + private final Set<ClientFilter> clientFilters = new HashSet<>(); + static { connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(200); @@ -121,6 +126,25 @@ public class WebServicesClient return client; } + // A bug in jersey Client results in a ClassCastException when using the built-in method in Client to check if + // the filter is already present. Hence using a wrapper method to keep track of added filters. + public boolean isFilterPresent(ClientFilter clientFilter) + { + return clientFilters.contains(clientFilter); + } + + public void addFilter(ClientFilter clientFilter) + { + client.addFilter(clientFilter); + clientFilters.add(clientFilter); + } + + public void clearFilters() + { + client.removeAllFilters(); + clientFilters.clear(); + } + public <T> T process(String url, Class<T> clazz, WebServicesHandler<T> handler) throws IOException { WebResource wr = client.resource(url); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/95bca45a/engine/src/test/java/com/datatorrent/stram/util/WebServicesClientTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/util/WebServicesClientTest.java b/engine/src/test/java/com/datatorrent/stram/util/WebServicesClientTest.java new file mode 100644 index 0000000..f07a7cf --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/util/WebServicesClientTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.util; + +import org.junit.Assert; +import org.junit.Test; + + +/** + * + */ +public class WebServicesClientTest +{ + @Test + public void testFilterPresent() + { + WebServicesClient webServicesClient = new WebServicesClient(); + HeaderClientFilter clientFilter = new HeaderClientFilter(); + webServicesClient.addFilter(clientFilter); + Assert.assertTrue("Filter present", webServicesClient.isFilterPresent(clientFilter)); + } +}