Repository: incubator-reef Updated Branches: refs/heads/master 1da62700f -> 84feeaf6a
[REEF-474]: Implement LocalNameResolver looking up local NameServer. This pull request addressed the issue by * Implementing `LocalNameResolverImpl` looking up local `NameServer`. JIRA: [REEF-474](https://issues.apache.org/jira/browse/REEF-474) Pull Request: This closes #294 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/84feeaf6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/84feeaf6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/84feeaf6 Branch: refs/heads/master Commit: 84feeaf6a3179b0b3307ca6ac8b126446c87f016 Parents: 1da6270 Author: taegeonum <[email protected]> Authored: Mon Jul 13 12:00:32 2015 +0900 Committer: Brian Cho <[email protected]> Committed: Fri Jul 31 11:57:40 2015 +0900 ---------------------------------------------------------------------- .../naming/LocalNameResolverConfiguration.java | 55 +++++++++ .../network/naming/LocalNameResolverImpl.java | 123 +++++++++++++++++++ .../services/network/LocalNameResolverTest.java | 104 ++++++++++++++++ 3 files changed, 282 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java new file mode 100644 index 0000000..7713e46 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.io.network.naming; + +import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.OptionalParameter; + +/** + * Configuration Module Builder for LocalNameResolver. + */ +public final class LocalNameResolverConfiguration extends ConfigurationModuleBuilder { + + /** + * The timeout of caching lookup. + */ + public static final OptionalParameter<Long> CACHE_TIMEOUT = new OptionalParameter<>(); + + /** + * The timeout of retrying connection. + */ + public static final OptionalParameter<Integer> RETRY_TIMEOUT = new OptionalParameter<>(); + + /** + * The number of retrying connection. + */ + public static final OptionalParameter<Integer> RETRY_COUNT = new OptionalParameter<>(); + + public static final ConfigurationModule CONF = new LocalNameResolverConfiguration() + .bindNamedParameter(NameResolverCacheTimeout.class, CACHE_TIMEOUT) + .bindNamedParameter(NameResolverRetryTimeout.class, RETRY_TIMEOUT) + .bindNamedParameter(NameResolverRetryCount.class, RETRY_COUNT) + .bindImplementation(NameResolver.class, LocalNameResolverImpl.class) + .build(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java new file mode 100644 index 0000000..1aa3de7 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.naming; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.naming.exception.NamingException; +import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.util.cache.Cache; +import org.apache.reef.wake.Identifier; + +import javax.inject.Inject; +import java.net.InetSocketAddress; +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * NameResolver looking up local name server. + * This class should be used when the NameServer is started locally. + */ +public final class LocalNameResolverImpl implements NameResolver { + + private static final Logger LOG = Logger.getLogger(LocalNameResolverImpl.class.getName()); + + /** + * A local name server. + */ + private final NameServer nameServer; + + /** + * A cache for lookup. + */ + private final Cache<Identifier, InetSocketAddress> cache; + + /** + * Retry count for lookup. + */ + private final int retryCount; + + /** + * Retry timeout for lookup. + */ + private final int retryTimeout; + + @Inject + private LocalNameResolverImpl( + final NameServer nameServer, + @Parameter(NameResolverCacheTimeout.class) final long timeout, + @Parameter(NameResolverRetryCount.class) final int retryCount, + @Parameter(NameResolverRetryTimeout.class) final int retryTimeout) { + this.nameServer = nameServer; + this.cache = new NameCache(timeout); + this.retryCount = retryCount; + this.retryTimeout = retryTimeout; + } + + @Override + public synchronized void register(final Identifier id, final InetSocketAddress address) throws NetworkException { + nameServer.register(id, address); + } + + @Override + public synchronized void unregister(final Identifier id) throws NetworkException { + nameServer.unregister(id); + } + + @Override + public void close() throws Exception { + } + + @Override + public InetSocketAddress lookup(final Identifier id) throws Exception { + return cache.get(id, new Callable<InetSocketAddress>() { + @Override + public InetSocketAddress call() throws Exception { + final int origRetryCount = LocalNameResolverImpl.this.retryCount; + int retriesLeft = origRetryCount; + while (true) { + try { + final InetSocketAddress addr = nameServer.lookup(id); + if (addr == null) { + throw new NullPointerException(); + } else { + return addr; + } + } catch (final NullPointerException e) { + if (retriesLeft <= 0) { + throw new NamingException("Cannot find " + id + " from the name server"); + } else { + final int retTimeout = LocalNameResolverImpl.this.retryTimeout + * (origRetryCount - retriesLeft + 1); + LOG.log(Level.WARNING, + "Caught Naming Exception while looking up " + id + + " with Name Server. Will retry " + retriesLeft + + " time(s) after waiting for " + retTimeout + " msec."); + Thread.sleep(retTimeout * retriesLeft); + --retriesLeft; + } + } + } + } + }); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java new file mode 100644 index 0000000..47c8700 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network; + +import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; +import org.apache.reef.io.network.naming.NameResolver; +import org.apache.reef.io.network.naming.exception.NamingException; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; + +public class LocalNameResolverTest { + + private final LocalAddressProvider localAddressProvider; + + public LocalNameResolverTest() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + + /** + * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}. + * + * @throws Exception + */ + @Test + public final void testClose() throws Exception { + final String localAddress = localAddressProvider.getLocalAddress(); + final IdentifierFactory factory = new StringIdentifierFactory(); + try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF + .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000) + .build()).getInstance(NameResolver.class)) { + final Identifier id = factory.getNewInstance("Task1"); + resolver.register(id, new InetSocketAddress(localAddress, 7001)); + resolver.unregister(id); + Thread.sleep(100); + } + } + + /** + * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}. + * To check caching behavior with expireAfterAccess & expireAfterWrite + * Changing NameCache's pattern to expireAfterAccess causes this test to fail + * + * @throws Exception + */ + @Test + public final void testLookup() throws Exception { + final IdentifierFactory factory = new StringIdentifierFactory(); + final String localAddress = localAddressProvider.getLocalAddress(); + try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF + .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150) + .build()).getInstance(NameResolver.class)) { + final Identifier id = factory.getNewInstance("Task1"); + final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 7001); + resolver.register(id, socketAddr); + InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry + Assert.assertTrue(socketAddr.equals(lookupAddr)); + resolver.unregister(id); + Thread.sleep(100); + try { + lookupAddr = resolver.lookup(id); + Thread.sleep(100); + //With expireAfterAccess, the previous lookup would reset expiry to 150ms + //more and 100ms wait will not expire the item and will return the cached value + //With expireAfterWrite, the extra wait of 100 ms will expire the item + //resulting in NamingException and the test passes + lookupAddr = resolver.lookup(id); + Assert.assertNull("resolver.lookup(id)", lookupAddr); + } catch (final Exception e) { + if (e instanceof ExecutionException) { + Assert.assertTrue("Execution Exception cause is instanceof NamingException", + e.getCause() instanceof NamingException); + } else { + throw e; + } + } + } + } +}
